ck пре 4 година
родитељ
комит
cc8c8eb0fe

+ 4 - 5
app/etl/rds_order_ck.py

@@ -3,17 +3,16 @@
 @date 20201127
 """
 
-from model.util.DataBaseUtils import MysqlUtils
-from model.util.CkUtils import CkUtils
-from model.util.DateUtils import DateUtils
+from model.DataBaseUtils import MysqlUtils,CkUtils
+from model.DateUtils import DateUtils
 from datetime import datetime
-from model.common.log import logger
+from model.log import logger
 log=logger()
 db = MysqlUtils()
 ck = CkUtils()
 dt = DateUtils()
 import sys
-from model.util.DingTalkUtils import DingTalkUtils
+from model.DingTalkUtils import DingTalkUtils
 
 
 def run(mydt):

+ 1 - 1
example/ding_test.py

@@ -1,2 +1,2 @@
-from model.util.DingTalkUtils import DingTalkUtils
+from model.DingTalkUtils import DingTalkUtils
 DingTalkUtils.send("这是测试文本", "18860455786")

+ 1 - 1
example/insert_example.py

@@ -3,7 +3,7 @@ Example
 pandas操作MySQL
 以pandas.dataframe 插入mysql
 """
-from model.util.DataBaseUtils import MysqlUtils
+from model.DataBaseUtils import MysqlUtils
 import pandas as pd
 db = MysqlUtils()
 

+ 1 - 1
example/select_example.py

@@ -4,7 +4,7 @@ pandas操作MySQL
 
  查数据,结果用dataframe显示
 """
-from model.util.DataBaseUtils import MysqlUtils
+from model.DataBaseUtils import MysqlUtils
 db = MysqlUtils()
 
 

+ 23 - 32
model/util/DataBaseOperation.py → model/DataBaseOperation.py

@@ -2,15 +2,13 @@
 @desc 数据库操作方法封装
 @auth  chenkai
 @date 2020/11/19
-@py_version py3.7
+@py_version py3.6
 """
 import pymysql
-# from clickhouse_sqlalchemy import make_session
-# from sqlalchemy import create_engine
 import logging as log
 import pandas as pd
 import time
-from model.common.log import logger
+from model.log import logger
 log = logger()
 pd.set_option('display.max_columns', None)
 pd.set_option('display.width', 1000)
@@ -579,34 +577,27 @@ class MysqlOperation:
         log.debug('update %s rows, cost: %s' % (len(key_values), time.time() - t0))
 
 
-# class CkOperation:
-#     cursor = None
-#     session = None
-#
-#     def __init__(self, conf):
-#         try:
-#             connection = 'clickhouse://{user}:{passwd}@{host}:{port}/{db}'.format(**conf)
-#             engine = create_engine(connection, pool_size=100, pool_recycle=3600, pool_timeout=20)
-#             self.session = make_session(engine)
-#
-#         except Exception as e:
-#             log.info(e)
-#
-#     def execute(self, sql):
-#         self.cursor = self.session.execute(sql)
-#         try:
-#             fields = self.cursor._metadata.keys
-#             return [dict(zip(fields, item)) for item in self.cursor.fetchall()]
-#         except Exception as e:
-#             log.info(e)
-#
-#     def getData_pd(self, sql):
-#         li = self.execute(sql)
-#         return pd.DataFrame(li)
-#
-#     def getOne(self, sql):
-#         li = self.execute(sql)
-#         return [i for i in li[0].values()][0]
+    def getColumn(self,table,flag=0):
+        "获取表的所有列"
+        sql="SELECT `COLUMN_NAME` FROM `INFORMATION_SCHEMA`.`COLUMNS` " \
+            "WHERE `TABLE_NAME`='{}' ORDER BY ordinal_position".format(table)
+        self.cursor.execute(sql)
+        a= self.cursor.fetchall()
+        str=''
+        li=[]
+        for i in a:
+            str+=i[0]+','
+            li.append(i[0])
+
+        if flag:
+            return li
+        else:
+            return str[:-1]
+
+
+
+
+
 
 
 

+ 93 - 0
model/DataBaseUtils.py

@@ -0,0 +1,93 @@
+"""
+@desc 数据库连接
+@auth chenkai
+@date 2020/11/19
+"""
+from model.DataBaseOperation import MysqlOperation
+from model.log import logger
+import yaml
+import os
+from clickhouse_driver.client import Client
+log = logger()
+
+
+class MysqlUtils:
+    _quchen_text = None
+
+    def __init__(self):
+        p_path =os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))
+        path = os.path.join(p_path,"config", "db_config.yaml")
+        f = open(path, encoding="utf-8")
+        self.config = yaml.load(f.read(), Loader=yaml.FullLoader)
+
+    @property
+    def quchen_text(self):
+
+        conf = self.config['quchen_text']
+        self._quchen_text = MysqlOperation(host=conf['host'],
+                                      user=conf['user'],
+                                      passwd=conf['passwd'],
+                                      db=conf['db'])
+        return self._quchen_text
+
+
+
+    def find_db(self, db):
+
+        if db == "quchen_text":
+            self._quchen_text = self._quchen_text
+            return self._quchen_text
+
+        else:
+            log.debug("输入数据库有误")
+
+    def close(self):
+        if self._quchen_text:
+            self._quchen_text.cursor.close()
+            self._quchen_text.conn.close()
+
+
+class CkUtils:
+
+    def __init__(self):
+        self.client = Client(host='cc-bp1h3yc7o3g3o7k64o.ads.aliyuncs.com',
+                           user='qucheng_ck',
+                           password='Qc123456',
+                           port='3306',
+                           send_receive_timeout=5)
+
+    def execute(self, sql):
+        self.client.execute(sql)
+
+
+    def insertMany(self,table,col,data):
+        """
+        :param table: 表名 srt
+        :param col:   字段名 srt   eg: ”a,b,c“
+        :param data:  tuple/list
+        :return:
+        """
+        max=200
+        sql="insert into {} ({}) values ".format(table,col)
+
+        if len(data) == 0:
+            log.debug("data.len==0")
+            return
+        if len(data) <= max:
+            sql = sql+str(data)[1:-1]
+            # log.info(sql)
+            # log.info("insert {} rows".format(len(data)))
+            self.execute(sql)
+            return
+        else:
+
+            sql2=sql+str(data[:max])[1:-1]
+            # log.info(sql2)
+            self.execute(sql2)
+            # log.info("insert {} rows".format(max))
+            self.insertMany(table,col,data[max:])
+
+
+if __name__ == '__main__':
+    p_path = os.path.dirname(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)))
+    print(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)))

+ 0 - 0
model/util/DateUtils.py → model/DateUtils.py


+ 0 - 0
model/util/DingTalkUtils.py → model/DingTalkUtils.py


+ 0 - 2
model/common/__init__.py

@@ -1,2 +0,0 @@
-
-

+ 3 - 6
model/common/log.py → model/log.py

@@ -18,7 +18,7 @@ class logger(object):
         if not self.logger.handlers:
             # 如果self.logger没有handler, 就执行以下代码添加handler
             self.logger.setLevel(logging.DEBUG)
-            rootpath =os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
+            rootpath =os.path.dirname(os.path.dirname(__file__))
             self.log_path = rootpath + '/logs'
             if not os.path.exists(self.log_path):
                 os.makedirs(self.log_path)
@@ -63,8 +63,5 @@ class logger(object):
 
 
 if __name__ == "__main__":
-    logger = logger()
-    logger.info("12345")
-    logger.debug("12345")
-    logger.warning("12345")
-    logger.error("12345")
+    rootpath = os.path.dirname(os.path.dirname(__file__))
+    print(rootpath)

+ 0 - 51
model/util/CkUtils.py

@@ -1,51 +0,0 @@
-from clickhouse_driver.client import Client
-from model.common.log import logger
-log=logger()
-
-class CkUtils:
-
-    def __init__(self):
-        self.client = Client(host='cc-bp1h3yc7o3g3o7k64o.ads.aliyuncs.com',
-                           user='qucheng_ck',
-                           password='Qc123456',
-                           port='3306',
-                           send_receive_timeout=5)
-
-    def execute(self, sql):
-        self.client.execute(sql)
-
-
-    def insertMany(self,table,col,data):
-        """
-        :param table: 表名 srt
-        :param col:   字段名 srt   eg: ”a,b,c“
-        :param data:  tuple
-        :return:
-        """
-        max=200
-        sql="insert into {} ({}) values ".format(table,col)
-
-        if len(data) == 0:
-            log.debug("data.len==0")
-            return
-        if len(data) <= max:
-            sql = sql+str(data)[1:-1]
-            # log.info(sql)
-            # log.info("insert {} rows".format(len(data)))
-            self.execute(sql)
-            return
-        else:
-
-            sql2=sql+str(data[:max])[1:-1]
-            # log.info(sql2)
-            self.execute(sql2)
-            # log.info("insert {} rows".format(max))
-            self.insertMany(table,col,data[max:])
-
-
-
-if __name__ == '__main__':
-    ck= CkUtils()
-    a =ck.execute("desc order")
-    print(a)
-

+ 0 - 48
model/util/DataBaseUtils.py

@@ -1,48 +0,0 @@
-"""
-@desc 数据库连接
-@auth chenkai
-@date 2020/11/19
-"""
-from .DataBaseOperation import *
-from model.common.log import logger
-import yaml
-import os
-
-log = logger()
-
-
-class MysqlUtils:
-    _quchen_text = None
-
-    def __init__(self):
-        p_path = os.path.dirname(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)))
-        path = os.path.join(p_path,"config", "db_config.yaml")
-        f = open(path, encoding="utf-8")
-        self.config = yaml.load(f.read(), Loader=yaml.FullLoader)
-
-    @property
-    def quchen_text(self):
-
-        conf = self.config['quchen_text']
-        self._quchen_text = MysqlOperation(host=conf['host'],
-                                      user=conf['user'],
-                                      passwd=conf['passwd'],
-                                      db=conf['db'])
-        return self._quchen_text
-
-
-
-    def find_db(self, db):
-
-        if db == "quchen_text":
-            self._quchen_text = self._quchen_text
-            return self._quchen_text
-
-        else:
-            log.debug("输入数据库有误")
-
-    def close(self):
-        if self._quchen_text:
-            self._quchen_text.cursor.close()
-            self._quchen_text.conn.close()
-

+ 0 - 0
model/util/__init__.py