""" @desc 数据库连接 @auth chenkai @date 2020/11/19 """ from model.DataBaseOperation import MysqlOperation from model.log import logger import yaml import os from clickhouse_driver.client import Client import pandas as pd log = logger() class MysqlUtils: _quchen_text = None _zx=None _dm=None def __init__(self): p_path =os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) path = os.path.join(p_path,"config", "db_config.yaml") f = open(path, encoding="utf-8") self.config = yaml.load(f.read(), Loader=yaml.FullLoader) @property def quchen_text(self): conf = self.config['quchen_text'] self._quchen_text = MysqlOperation(host=conf['host'], user=conf['user'], passwd=conf['passwd'], db=conf['db']) return self._quchen_text @property def zx(self): conf = self.config['zx'] self._zx = MysqlOperation(host=conf['host'], user=conf['user'], passwd=conf['passwd'], db=conf['db']) return self._zx @property def dm(self): conf = self.config['dm'] self._dm = MysqlOperation(host=conf['host'], user=conf['user'], passwd=conf['passwd'], db=conf['db']) return self._dm def find_db(self, db): if db == "quchen_text": self._quchen_text = self._quchen_text return self._quchen_text else: log.debug("输入数据库有误") def close(self): if self._quchen_text: self._quchen_text.cursor.close() self._quchen_text.conn.close() if self._zx: self._zx.cursor.close() self._zx.conn.close() if self._dm: self._dm.cursor.close() self._dm.conn.close() class CkUtils: def __init__(self): self.client = Client(host='cc-bp1ah87xjy8yb884s.public.clickhouse.ads.aliyuncs.com', user='qc', password='Qc_1234567', port='3306', send_receive_timeout=150) def execute(self, sql): return self.client.execute(sql) def getDataList(self,sql): data = self.client.execute(sql) return [list(i) for i in data] def getDataOne(self,sql): data = self.client.execute(sql) return data[0][0] def getData_pd(self, sql, col): """ :param sql: :param col: [[]] :return: """ data = self.execute(sql) df = pd.DataFrame(data, columns=col) return df def getColumns(self, table, is_list=False): """默认返回列表""" data = self.execute("desc " + table) li = [] str = '' for i in data: li.append(i[0]) str += i[0] + ',' if is_list: return li else: return str[:-1] def insertMany(self,table,col,data): """ :param table: 表名 srt :param col: 字段名 srt eg: ”a,b,c“ :param data: tuple/list :return: """ max=200 sql="insert into {} ({}) values ".format(table,col) if len(data) == 0: log.debug("data.len==0") return if len(data) <= max: sql = sql+str(data)[1:-1] # log.info(sql) # log.info("insert {} rows".format(len(data))) self.execute(sql) return else: sql2=sql+str(data[:max])[1:-1] # log.info(sql2) self.execute(sql2) # log.info("insert {} rows".format(max)) self.insertMany(table,col,data[max:]) if __name__ == '__main__': p_path = os.path.dirname(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))) print(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)))