DataBaseUtils.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. """
  2. @desc 数据库连接
  3. @auth chenkai
  4. @date 2020/11/19
  5. """
  6. from model.DataBaseOperation import MysqlOperation
  7. import yaml
  8. import os
  9. import pandas as pd
  10. from clickhouse_driver.client import Client
  11. import logging
  12. def db_config():
  13. p_path = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))
  14. path = os.path.join(p_path, "config", "db_config.yaml")
  15. f = open(path, encoding="utf-8")
  16. config = yaml.load(f.read(), Loader=yaml.FullLoader)
  17. return config
  18. class MysqlUtils:
  19. _quchen_text = None
  20. _zx = None
  21. _zx_ads = None
  22. _dm = None
  23. _zx_test = None
  24. def __init__(self):
  25. self.config = db_config()
  26. @property
  27. def db_mp(self):
  28. conf = self.config['quchen_text']
  29. self._quchen_text = MysqlOperation(host=conf['host'],
  30. user=conf['user'],
  31. passwd=conf['passwd'],
  32. db=conf['db'])
  33. return self._quchen_text
  34. @property
  35. def quchen_text(self):
  36. conf = self.config['quchen_text']
  37. self._quchen_text = MysqlOperation(host=conf['host'],
  38. user=conf['user'],
  39. passwd=conf['passwd'],
  40. db=conf['db'])
  41. return self._quchen_text
  42. @property
  43. def zx(self):
  44. conf = self.config['zx']
  45. self._zx = MysqlOperation(host=conf['host'],
  46. user=conf['user'],
  47. passwd=conf['passwd'],
  48. db=conf['db'])
  49. return self._zx
  50. @property
  51. def zx_platform(self):
  52. conf = self.config['zx_platform']
  53. self._zx = MysqlOperation(host=conf['host'],
  54. user=conf['user'],
  55. passwd=conf['passwd'],
  56. db=conf['db'])
  57. return self._zx
  58. @property
  59. def zx_ads(self):
  60. conf = self.config['zx_ads']
  61. self._zx_ads = MysqlOperation(host=conf['host'],
  62. user=conf['user'],
  63. passwd=conf['passwd'],
  64. db=conf['db'])
  65. return self._zx_ads
  66. @property
  67. def zx_test(self):
  68. conf = self.config['zx_test']
  69. self._zx_test = MysqlOperation(host=conf['host'],
  70. user=conf['user'],
  71. passwd=conf['passwd'],
  72. db=conf['db'])
  73. return self._zx_test
  74. @property
  75. def dm(self):
  76. conf = self.config['dm']
  77. self._dm = MysqlOperation(host=conf['host'],
  78. user=conf['user'],
  79. passwd=conf['passwd'],
  80. port=conf['port'],
  81. db=conf['db'])
  82. return self._dm
  83. def find_db(self, db):
  84. if db == "quchen_text":
  85. self._quchen_text = self._quchen_text
  86. return self._quchen_text
  87. else:
  88. logging.info("输入数据库有误")
  89. def close(self):
  90. if self._quchen_text:
  91. self._quchen_text.cursor.close()
  92. self._quchen_text.conn.close()
  93. if self._zx:
  94. self._zx.cursor.close()
  95. self._zx.conn.close()
  96. if self._dm:
  97. self._dm.cursor.close()
  98. self._dm.conn.close()
  99. if self._zx_test:
  100. self._zx_test.cursor.close()
  101. self._zx_test.conn.close()
  102. if self._zx_ads:
  103. self._zx_ads.cursor.close()
  104. self._zx_ads.conn.close()
  105. class CkUtils:
  106. def __init__(self):
  107. self.config = db_config()
  108. conf = self.config['clickhouse']
  109. self.client = Client(host=conf['host'],
  110. user=conf['user'],
  111. password=conf['passwd'],
  112. port=conf['port'],
  113. send_receive_timeout=600)
  114. def execute(self, sql):
  115. return self.client.execute(sql)
  116. def getData_pd(self, sql, col):
  117. """
  118. :param sql:
  119. :param col: ['a','b']
  120. :return:
  121. """
  122. data = self.execute(sql)
  123. df = pd.DataFrame(data, columns=col)
  124. return df
  125. def getData_pdv2(self, sql):
  126. data = self.client.execute_iter(sql, with_column_types=True)
  127. columns = [column[0] for column in next(data)]
  128. df = pd.DataFrame.from_records(data, columns=columns)
  129. return df
  130. def getData_json(self, sql):
  131. return self.getData_pdv2(sql).to_json(orient='records')
  132. def getColumns(self, table, is_list=False):
  133. data = self.execute("desc " + table)
  134. li = []
  135. str = ''
  136. for i in data:
  137. li.append(i[0])
  138. str += i[0] + ','
  139. if is_list:
  140. return li
  141. else:
  142. return str[:-1]
  143. def insertMany(self, table, col, data):
  144. """
  145. :param table: 表名 srt
  146. :param col: 字段名 srt eg: ”a,b,c“
  147. :param data: tuple/list
  148. :return:
  149. """
  150. max = 1000
  151. sql = "insert into {} ({}) values ".format(table, col)
  152. if len(data) == 0:
  153. logging.info("data.len==0")
  154. return
  155. if len(data) <= max:
  156. sql = sql + str(data)[1:-1]
  157. # log.info(sql)
  158. # log.info("insert {} rows".format(len(data)))
  159. self.execute(sql)
  160. return
  161. else:
  162. sql2 = sql + str(data[:max])[1:-1]
  163. # log.info(sql2)
  164. self.execute(sql2)
  165. # log.info("insert {} rows".format(max))
  166. self.insertMany(table, col, data[max:])
  167. if __name__ == '__main__':
  168. # p_path = os.path.dirname(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)))
  169. # print(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)))
  170. ck = CkUtils()
  171. print(ck.execute("desc order"))