123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- """
- @desc 数据库连接
- @auth chenkai
- @date 2020/11/19
- """
- from model.DataBaseOperation import MysqlOperation
- import yaml
- import os
- import pandas as pd
- from clickhouse_driver.client import Client
- import logging
- def db_config():
- p_path = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))
- path = os.path.join(p_path, "config", "db_config.yaml")
- f = open(path, encoding="utf-8")
- config = yaml.load(f.read(), Loader=yaml.FullLoader)
- return config
- class MysqlUtils:
- _quchen_text = None
- _zx = None
- _zx_ads = None
- _dm = None
- _zx_test = None
- def __init__(self):
- self.config = db_config()
- @property
- def db_mp(self):
- conf = self.config['quchen_text']
- self._quchen_text = MysqlOperation(host=conf['host'],
- user=conf['user'],
- passwd=conf['passwd'],
- db=conf['db'])
- return self._quchen_text
- @property
- def quchen_text(self):
- conf = self.config['quchen_text']
- self._quchen_text = MysqlOperation(host=conf['host'],
- user=conf['user'],
- passwd=conf['passwd'],
- db=conf['db'])
- return self._quchen_text
- @property
- def zx(self):
- conf = self.config['zx']
- self._zx = MysqlOperation(host=conf['host'],
- user=conf['user'],
- passwd=conf['passwd'],
- db=conf['db'])
- return self._zx
- @property
- def zx_platform(self):
- conf = self.config['zx_platform']
- self._zx = MysqlOperation(host=conf['host'],
- user=conf['user'],
- passwd=conf['passwd'],
- db=conf['db'])
- return self._zx
- @property
- def zx_ads(self):
- conf = self.config['zx_ads']
- self._zx_ads = MysqlOperation(host=conf['host'],
- user=conf['user'],
- passwd=conf['passwd'],
- db=conf['db'])
- return self._zx_ads
- @property
- def zx_test(self):
- conf = self.config['zx_test']
- self._zx_test = MysqlOperation(host=conf['host'],
- user=conf['user'],
- passwd=conf['passwd'],
- db=conf['db'])
- return self._zx_test
- @property
- def dm(self):
- conf = self.config['dm']
- self._dm = MysqlOperation(host=conf['host'],
- user=conf['user'],
- passwd=conf['passwd'],
- port=conf['port'],
- db=conf['db'])
- return self._dm
- def find_db(self, db):
- if db == "quchen_text":
- self._quchen_text = self._quchen_text
- return self._quchen_text
- else:
- logging.info("输入数据库有误")
- def close(self):
- if self._quchen_text:
- self._quchen_text.cursor.close()
- self._quchen_text.conn.close()
- if self._zx:
- self._zx.cursor.close()
- self._zx.conn.close()
- if self._dm:
- self._dm.cursor.close()
- self._dm.conn.close()
- if self._zx_test:
- self._zx_test.cursor.close()
- self._zx_test.conn.close()
- if self._zx_ads:
- self._zx_ads.cursor.close()
- self._zx_ads.conn.close()
- class CkUtils:
- def __init__(self):
- self.config = db_config()
- conf = self.config['clickhouse']
- self.client = Client(host=conf['host'],
- user=conf['user'],
- password=conf['passwd'],
- port=conf['port'],
- send_receive_timeout=600)
- def execute(self, sql):
- return self.client.execute(sql)
- def getData_pd(self, sql, col):
- """
- :param sql:
- :param col: ['a','b']
- :return:
- """
- data = self.execute(sql)
- df = pd.DataFrame(data, columns=col)
- return df
- def getData_pdv2(self, sql):
- data = self.client.execute_iter(sql, with_column_types=True)
- columns = [column[0] for column in next(data)]
- df = pd.DataFrame.from_records(data, columns=columns)
- return df
- def getData_json(self, sql):
- return self.getData_pdv2(sql).to_json(orient='records')
- def getColumns(self, table, is_list=False):
- data = self.execute("desc " + table)
- li = []
- str = ''
- for i in data:
- li.append(i[0])
- str += i[0] + ','
- if is_list:
- return li
- else:
- return str[:-1]
- def insertMany(self, table, col, data):
- """
- :param table: 表名 srt
- :param col: 字段名 srt eg: ”a,b,c“
- :param data: tuple/list
- :return:
- """
- max = 1000
- sql = "insert into {} ({}) values ".format(table, col)
- if len(data) == 0:
- logging.info("data.len==0")
- return
- if len(data) <= max:
- sql = sql + str(data)[1:-1]
- # log.info(sql)
- # log.info("insert {} rows".format(len(data)))
- self.execute(sql)
- return
- else:
- sql2 = sql + str(data[:max])[1:-1]
- # log.info(sql2)
- self.execute(sql2)
- # log.info("insert {} rows".format(max))
- self.insertMany(table, col, data[max:])
- if __name__ == '__main__':
- # p_path = os.path.dirname(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)))
- # print(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)))
- ck = CkUtils()
- print(ck.execute("desc order"))
|