""" @desc 数据库连接 @auth chenkai @date 2020/11/19 """ from model.DataBaseOperation import MysqlOperation import yaml import os import pandas as pd from clickhouse_driver.client import Client import logging def db_config(): p_path = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) path = os.path.join(p_path, "config", "db_config.yaml") f = open(path, encoding="utf-8") config = yaml.load(f.read(), Loader=yaml.FullLoader) return config class MysqlUtils: _quchen_text = None _zx = None _zx_ads = None _dm = None _zx_test = None def __init__(self): self.config = db_config() @property def quchen_text(self): conf = self.config['quchen_text'] self._quchen_text = MysqlOperation(host=conf['host'], user=conf['user'], passwd=conf['passwd'], db=conf['db']) return self._quchen_text @property def zx(self): conf = self.config['zx'] self._zx = MysqlOperation(host=conf['host'], user=conf['user'], passwd=conf['passwd'], db=conf['db']) return self._zx @property def zx_platform(self): conf = self.config['zx_platform'] self._zx = MysqlOperation(host=conf['host'], user=conf['user'], passwd=conf['passwd'], db=conf['db']) return self._zx @property def zx_ads(self): conf = self.config['zx_ads'] self._zx_ads = MysqlOperation(host=conf['host'], user=conf['user'], passwd=conf['passwd'], db=conf['db']) return self._zx_ads @property def zx_test(self): conf = self.config['zx_test'] self._zx_test = MysqlOperation(host=conf['host'], user=conf['user'], passwd=conf['passwd'], db=conf['db']) return self._zx_test @property def dm(self): conf = self.config['dm'] self._dm = MysqlOperation(host=conf['host'], user=conf['user'], passwd=conf['passwd'], db=conf['db']) return self._dm def find_db(self, db): if db == "quchen_text": self._quchen_text = self._quchen_text return self._quchen_text else: logging.info("输入数据库有误") def close(self): if self._quchen_text: self._quchen_text.cursor.close() self._quchen_text.conn.close() if self._zx: self._zx.cursor.close() self._zx.conn.close() if self._dm: self._dm.cursor.close() self._dm.conn.close() if self._zx_test: self._zx_test.cursor.close() self._zx_test.conn.close() if self._zx_ads: self._zx_ads.cursor.close() self._zx_ads.conn.close() class CkUtils: def __init__(self): self.config = db_config() conf = self.config['clickhouse'] self.client = Client(host=conf['host'], user=conf['user'], password=conf['passwd'], port=conf['port'], send_receive_timeout=5) def execute(self, sql): return self.client.execute(sql) def getData_pd(self, sql, col): """ :param sql: :param col: ['a','b'] :return: """ data = self.execute(sql) df = pd.DataFrame(data, columns=col) return df def getData_pdv2(self, sql): data = self.client.execute_iter(sql, with_column_types=True) columns = [column[0] for column in next(data)] df = pd.DataFrame.from_records(data, columns=columns) return df def getData_json(self, sql): return self.getData_pdv2(sql).to_json(orient='records') def getColumns(self, table, is_list=False): data = self.execute("desc " + table) li = [] str = '' for i in data: li.append(i[0]) str += i[0] + ',' if is_list: return li else: return str[:-1] def insertMany(self, table, col, data): """ :param table: 表名 srt :param col: 字段名 srt eg: ”a,b,c“ :param data: tuple/list :return: """ max = 1000 sql = "insert into {} ({}) values ".format(table, col) if len(data) == 0: logging.info("data.len==0") return if len(data) <= max: sql = sql + str(data)[1:-1] # log.info(sql) # log.info("insert {} rows".format(len(data))) self.execute(sql) return else: sql2 = sql + str(data[:max])[1:-1] # log.info(sql2) self.execute(sql2) # log.info("insert {} rows".format(max)) self.insertMany(table, col, data[max:]) if __name__ == '__main__': # p_path = os.path.dirname(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))) # print(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))) ck = CkUtils() print(ck.execute("desc order"))