""" @desc 数据库连接 @auth chenkai @date 2020/11/19 """ from model.DataBaseOperation import MysqlOperation from model.log import logger import yaml import os import pandas as pd from clickhouse_driver.client import Client import json log = logger() class MysqlUtils: _quchen_text = None _zx=None _zx_ads=None _dm=None _zx_test =None def __init__(self): p_path =os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) path = os.path.join(p_path,"config", "db_config.yaml") f = open(path, encoding="utf-8") self.config = yaml.load(f.read(), Loader=yaml.FullLoader) @property def quchen_text(self): conf = self.config['quchen_text'] self._quchen_text = MysqlOperation(host=conf['host'], user=conf['user'], passwd=conf['passwd'], db=conf['db']) return self._quchen_text @property def zx(self): conf = self.config['zx'] self._zx = MysqlOperation(host=conf['host'], user=conf['user'], passwd=conf['passwd'], db=conf['db']) return self._zx @property def zx_ads(self): conf = self.config['zx_ads'] self._zx_ads = MysqlOperation(host=conf['host'], user=conf['user'], passwd=conf['passwd'], db=conf['db']) return self._zx_ads @property def zx_test(self): conf = self.config['zx_test'] self._zx_test = MysqlOperation(host=conf['host'], user=conf['user'], passwd=conf['passwd'], db=conf['db']) return self._zx_test @property def dm(self): conf = self.config['dm'] self._dm = MysqlOperation(host=conf['host'], user=conf['user'], passwd=conf['passwd'], db=conf['db']) return self._dm def find_db(self, db): if db == "quchen_text": self._quchen_text = self._quchen_text return self._quchen_text else: log.debug("输入数据库有误") def close(self): if self._quchen_text: self._quchen_text.cursor.close() self._quchen_text.conn.close() if self._zx: self._zx.cursor.close() self._zx.conn.close() if self._dm: self._dm.cursor.close() self._dm.conn.close() if self._zx_test: self._zx_test.cursor.close() self._zx_test.conn.close() if self._zx_ads: self._zx_ads.cursor.close() self._zx_ads.conn.close() class CkUtils: def __init__(self): self.client = Client(host='cc-bp1h3yc7o3g3o7k64o.ads.aliyuncs.com', user='qucheng_ck', password='Qc123456', port='3306', send_receive_timeout=5) def execute(self, sql): return self.client.execute(sql) def getData_pd(self,sql,col): """ :param sql: :param col: ['a','b'] :return: """ data=self.execute(sql) df=pd.DataFrame(data,columns=col) return df def getData_pdv2(self,sql): data = self.client.execute_iter(sql,with_column_types=True) columns = [column[0] for column in next(data)] df = pd.DataFrame.from_records(data, columns=columns) return df # print(df.to_json(orient='records')) def getData_json(self,sql): return self.getData_pdv2(sql).to_json(orient='records') def getColumns(self,table,is_list=False): data=self.execute("desc "+table) li=[] str = '' for i in data: li.append(i[0]) str+=i[0]+',' if is_list: return li else: return str[:-1] def insertMany(self,table,col,data): """ :param table: 表名 srt :param col: 字段名 srt eg: ”a,b,c“ :param data: tuple/list :return: """ max=1000 sql="insert into {} ({}) values ".format(table,col) if len(data) == 0: log.debug("data.len==0") return if len(data) <= max: sql = sql+str(data)[1:-1] # log.info(sql) # log.info("insert {} rows".format(len(data))) self.execute(sql) return else: sql2=sql+str(data[:max])[1:-1] # log.info(sql2) self.execute(sql2) # log.info("insert {} rows".format(max)) self.insertMany(table,col,data[max:]) if __name__ == '__main__': # p_path = os.path.dirname(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))) # print(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))) ck=CkUtils() print(ck.execute("desc order"))