DataBaseUtils.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. """
  2. @desc 数据库连接
  3. @auth chenkai
  4. @date 2020/11/19
  5. """
  6. from model.DataBaseOperation import MysqlOperation
  7. from model.log import logger
  8. import yaml
  9. import os
  10. import pandas as pd
  11. from clickhouse_driver.client import Client
  12. import json
  13. log = logger()
  14. class MysqlUtils:
  15. _quchen_text = None
  16. _zx=None
  17. _zx_ads=None
  18. _dm=None
  19. _zx_test =None
  20. def __init__(self):
  21. p_path =os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))
  22. path = os.path.join(p_path,"config", "db_config.yaml")
  23. f = open(path, encoding="utf-8")
  24. self.config = yaml.load(f.read(), Loader=yaml.FullLoader)
  25. @property
  26. def quchen_text(self):
  27. conf = self.config['quchen_text']
  28. self._quchen_text = MysqlOperation(host=conf['host'],
  29. user=conf['user'],
  30. passwd=conf['passwd'],
  31. db=conf['db'])
  32. return self._quchen_text
  33. @property
  34. def zx(self):
  35. conf = self.config['zx']
  36. self._zx = MysqlOperation(host=conf['host'],
  37. user=conf['user'],
  38. passwd=conf['passwd'],
  39. db=conf['db'])
  40. return self._zx
  41. @property
  42. def zx_ads(self):
  43. conf = self.config['zx_ads']
  44. self._zx_ads = MysqlOperation(host=conf['host'],
  45. user=conf['user'],
  46. passwd=conf['passwd'],
  47. db=conf['db'])
  48. return self._zx_ads
  49. @property
  50. def zx_test(self):
  51. conf = self.config['zx_test']
  52. self._zx_test = MysqlOperation(host=conf['host'],
  53. user=conf['user'],
  54. passwd=conf['passwd'],
  55. db=conf['db'])
  56. return self._zx_test
  57. @property
  58. def dm(self):
  59. conf = self.config['dm']
  60. self._dm = MysqlOperation(host=conf['host'],
  61. user=conf['user'],
  62. passwd=conf['passwd'],
  63. db=conf['db'])
  64. return self._dm
  65. def find_db(self, db):
  66. if db == "quchen_text":
  67. self._quchen_text = self._quchen_text
  68. return self._quchen_text
  69. else:
  70. log.debug("输入数据库有误")
  71. def close(self):
  72. if self._quchen_text:
  73. self._quchen_text.cursor.close()
  74. self._quchen_text.conn.close()
  75. if self._zx:
  76. self._zx.cursor.close()
  77. self._zx.conn.close()
  78. if self._dm:
  79. self._dm.cursor.close()
  80. self._dm.conn.close()
  81. if self._zx_test:
  82. self._zx_test.cursor.close()
  83. self._zx_test.conn.close()
  84. if self._zx_ads:
  85. self._zx_ads.cursor.close()
  86. self._zx_ads.conn.close()
  87. class CkUtils:
  88. def __init__(self):
  89. self.client = Client(host='cc-bp1h3yc7o3g3o7k64o.ads.aliyuncs.com',
  90. user='qucheng_ck',
  91. password='Qc123456',
  92. port='3306',
  93. send_receive_timeout=5)
  94. def execute(self, sql):
  95. return self.client.execute(sql)
  96. def getData_pd(self,sql,col):
  97. """
  98. :param sql:
  99. :param col: ['a','b']
  100. :return:
  101. """
  102. data=self.execute(sql)
  103. df=pd.DataFrame(data,columns=col)
  104. return df
  105. def getData_pdv2(self,sql):
  106. data = self.client.execute_iter(sql,with_column_types=True)
  107. columns = [column[0] for column in next(data)]
  108. df = pd.DataFrame.from_records(data, columns=columns)
  109. return df
  110. # print(df.to_json(orient='records'))
  111. def getData_json(self,sql):
  112. return self.getData_pdv2(sql).to_json(orient='records')
  113. def getColumns(self,table,is_list=False):
  114. data=self.execute("desc "+table)
  115. li=[]
  116. str = ''
  117. for i in data:
  118. li.append(i[0])
  119. str+=i[0]+','
  120. if is_list:
  121. return li
  122. else:
  123. return str[:-1]
  124. def insertMany(self,table,col,data):
  125. """
  126. :param table: 表名 srt
  127. :param col: 字段名 srt eg: ”a,b,c“
  128. :param data: tuple/list
  129. :return:
  130. """
  131. max=1000
  132. sql="insert into {} ({}) values ".format(table,col)
  133. if len(data) == 0:
  134. log.debug("data.len==0")
  135. return
  136. if len(data) <= max:
  137. sql = sql+str(data)[1:-1]
  138. # log.info(sql)
  139. # log.info("insert {} rows".format(len(data)))
  140. self.execute(sql)
  141. return
  142. else:
  143. sql2=sql+str(data[:max])[1:-1]
  144. # log.info(sql2)
  145. self.execute(sql2)
  146. # log.info("insert {} rows".format(max))
  147. self.insertMany(table,col,data[max:])
  148. if __name__ == '__main__':
  149. # p_path = os.path.dirname(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)))
  150. # print(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)))
  151. ck=CkUtils()
  152. print(ck.execute("desc order"))