DataBaseUtils.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. """
  2. @desc 数据库连接
  3. @auth chenkai
  4. @date 2020/11/19
  5. """
  6. from model.DataBaseOperation import MysqlOperation
  7. from model.log import logger
  8. import yaml
  9. import os
  10. from clickhouse_driver.client import Client
  11. import pandas as pd
  12. log = logger()
  13. class MysqlUtils:
  14. _quchen_text = None
  15. _zx=None
  16. _dm=None
  17. def __init__(self):
  18. p_path =os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))
  19. path = os.path.join(p_path,"config", "db_config.yaml")
  20. f = open(path, encoding="utf-8")
  21. self.config = yaml.load(f.read(), Loader=yaml.FullLoader)
  22. @property
  23. def quchen_text(self):
  24. conf = self.config['quchen_text']
  25. self._quchen_text = MysqlOperation(host=conf['host'],
  26. user=conf['user'],
  27. passwd=conf['passwd'],
  28. db=conf['db'])
  29. return self._quchen_text
  30. @property
  31. def zx(self):
  32. conf = self.config['zx']
  33. self._zx = MysqlOperation(host=conf['host'],
  34. user=conf['user'],
  35. passwd=conf['passwd'],
  36. db=conf['db'])
  37. return self._zx
  38. @property
  39. def dm(self):
  40. conf = self.config['dm']
  41. self._dm = MysqlOperation(host=conf['host'],
  42. user=conf['user'],
  43. passwd=conf['passwd'],
  44. db=conf['db'])
  45. return self._dm
  46. def find_db(self, db):
  47. if db == "quchen_text":
  48. self._quchen_text = self._quchen_text
  49. return self._quchen_text
  50. else:
  51. log.debug("输入数据库有误")
  52. def close(self):
  53. if self._quchen_text:
  54. self._quchen_text.cursor.close()
  55. self._quchen_text.conn.close()
  56. if self._zx:
  57. self._zx.cursor.close()
  58. self._zx.conn.close()
  59. if self._dm:
  60. self._dm.cursor.close()
  61. self._dm.conn.close()
  62. class CkUtils:
  63. def __init__(self):
  64. self.client = Client(host='cc-bp1ah87xjy8yb884s.public.clickhouse.ads.aliyuncs.com',
  65. user='qc',
  66. password='Qc_1234567',
  67. port='3306',
  68. send_receive_timeout=150)
  69. def execute(self, sql):
  70. return self.client.execute(sql)
  71. def getDataList(self,sql):
  72. data = self.client.execute(sql)
  73. return [list(i) for i in data]
  74. def getDataOne(self,sql):
  75. data = self.client.execute(sql)
  76. return data[0][0]
  77. def getData_pd(self, sql, col):
  78. """
  79. :param sql:
  80. :param col: [[]]
  81. :return:
  82. """
  83. data = self.execute(sql)
  84. df = pd.DataFrame(data, columns=col)
  85. return df
  86. def getColumns(self, table, is_list=False):
  87. """默认返回列表"""
  88. data = self.execute("desc " + table)
  89. li = []
  90. str = ''
  91. for i in data:
  92. li.append(i[0])
  93. str += i[0] + ','
  94. if is_list:
  95. return li
  96. else:
  97. return str[:-1]
  98. def insertMany(self,table,col,data):
  99. """
  100. :param table: 表名 srt
  101. :param col: 字段名 srt eg: ”a,b,c“
  102. :param data: tuple/list
  103. :return:
  104. """
  105. max=200
  106. sql="insert into {} ({}) values ".format(table,col)
  107. if len(data) == 0:
  108. log.debug("data.len==0")
  109. return
  110. if len(data) <= max:
  111. sql = sql+str(data)[1:-1]
  112. # log.info(sql)
  113. # log.info("insert {} rows".format(len(data)))
  114. self.execute(sql)
  115. return
  116. else:
  117. sql2=sql+str(data[:max])[1:-1]
  118. # log.info(sql2)
  119. self.execute(sql2)
  120. # log.info("insert {} rows".format(max))
  121. self.insertMany(table,col,data[max:])
  122. if __name__ == '__main__':
  123. p_path = os.path.dirname(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)))
  124. print(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)))