DataBaseUtils.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. """
  2. @desc 数据库连接
  3. @auth chenkai
  4. @date 2020/11/19
  5. """
  6. from model.DataBaseOperation import MysqlOperation
  7. from model.log import logger
  8. import yaml
  9. import os
  10. import pandas as pd
  11. from clickhouse_driver.client import Client
  12. log = logger()
  13. def db_config():
  14. p_path = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))
  15. path = os.path.join(p_path, "config", "db_config.yaml")
  16. f = open(path, encoding="utf-8")
  17. config = yaml.load(f.read(), Loader=yaml.FullLoader)
  18. return config
  19. class MysqlUtils:
  20. _quchen_text = None
  21. _zx=None
  22. _zx_ads=None
  23. _dm=None
  24. _zx_test =None
  25. def __init__(self):
  26. self.config = db_config()
  27. @property
  28. def quchen_text(self):
  29. conf = self.config['quchen_text']
  30. self._quchen_text = MysqlOperation(host=conf['host'],
  31. user=conf['user'],
  32. passwd=conf['passwd'],
  33. db=conf['db'])
  34. return self._quchen_text
  35. @property
  36. def zx(self):
  37. conf = self.config['zx']
  38. self._zx = MysqlOperation(host=conf['host'],
  39. user=conf['user'],
  40. passwd=conf['passwd'],
  41. db=conf['db'])
  42. return self._zx
  43. @property
  44. def zx_ads(self):
  45. conf = self.config['zx_ads']
  46. self._zx_ads = MysqlOperation(host=conf['host'],
  47. user=conf['user'],
  48. passwd=conf['passwd'],
  49. db=conf['db'])
  50. return self._zx_ads
  51. @property
  52. def zx_test(self):
  53. conf = self.config['zx_test']
  54. self._zx_test = MysqlOperation(host=conf['host'],
  55. user=conf['user'],
  56. passwd=conf['passwd'],
  57. db=conf['db'])
  58. return self._zx_test
  59. @property
  60. def dm(self):
  61. conf = self.config['dm']
  62. self._dm = MysqlOperation(host=conf['host'],
  63. user=conf['user'],
  64. passwd=conf['passwd'],
  65. db=conf['db'])
  66. return self._dm
  67. def find_db(self, db):
  68. if db == "quchen_text":
  69. self._quchen_text = self._quchen_text
  70. return self._quchen_text
  71. else:
  72. log.debug("输入数据库有误")
  73. def close(self):
  74. if self._quchen_text:
  75. self._quchen_text.cursor.close()
  76. self._quchen_text.conn.close()
  77. if self._zx:
  78. self._zx.cursor.close()
  79. self._zx.conn.close()
  80. if self._dm:
  81. self._dm.cursor.close()
  82. self._dm.conn.close()
  83. if self._zx_test:
  84. self._zx_test.cursor.close()
  85. self._zx_test.conn.close()
  86. if self._zx_ads:
  87. self._zx_ads.cursor.close()
  88. self._zx_ads.conn.close()
  89. class CkUtils:
  90. def __init__(self):
  91. self.config=db_config()
  92. conf = self.config['clickhouse']
  93. self.client = Client(host=conf['host'],
  94. user=conf['user'],
  95. password=conf['passwd'],
  96. port=conf['port'],
  97. send_receive_timeout=5)
  98. def execute(self, sql):
  99. return self.client.execute(sql)
  100. def getData_pd(self,sql,col):
  101. """
  102. :param sql:
  103. :param col: ['a','b']
  104. :return:
  105. """
  106. data=self.execute(sql)
  107. df=pd.DataFrame(data,columns=col)
  108. return df
  109. def getData_pdv2(self,sql):
  110. data = self.client.execute_iter(sql,with_column_types=True)
  111. columns = [column[0] for column in next(data)]
  112. df = pd.DataFrame.from_records(data, columns=columns)
  113. return df
  114. # print(df.to_json(orient='records'))
  115. def getData_json(self,sql):
  116. return self.getData_pdv2(sql).to_json(orient='records')
  117. def getColumns(self,table,is_list=False):
  118. data=self.execute("desc "+table)
  119. li=[]
  120. str = ''
  121. for i in data:
  122. li.append(i[0])
  123. str+=i[0]+','
  124. if is_list:
  125. return li
  126. else:
  127. return str[:-1]
  128. def insertMany(self,table,col,data):
  129. """
  130. :param table: 表名 srt
  131. :param col: 字段名 srt eg: ”a,b,c“
  132. :param data: tuple/list
  133. :return:
  134. """
  135. max=1000
  136. sql="insert into {} ({}) values ".format(table,col)
  137. if len(data) == 0:
  138. log.debug("data.len==0")
  139. return
  140. if len(data) <= max:
  141. sql = sql+str(data)[1:-1]
  142. # log.info(sql)
  143. # log.info("insert {} rows".format(len(data)))
  144. self.execute(sql)
  145. return
  146. else:
  147. sql2=sql+str(data[:max])[1:-1]
  148. # log.info(sql2)
  149. self.execute(sql2)
  150. # log.info("insert {} rows".format(max))
  151. self.insertMany(table,col,data[max:])
  152. if __name__ == '__main__':
  153. # p_path = os.path.dirname(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)))
  154. # print(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)))
  155. ck=CkUtils()
  156. print(ck.execute("desc order"))