DataBaseUtils.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. """
  2. @desc 数据库连接
  3. @auth chenkai
  4. @date 2020/11/19
  5. """
  6. from model.DataBaseOperation import MysqlOperation
  7. from model.log import logger
  8. import yaml
  9. import os
  10. from clickhouse_driver.client import Client
  11. import pandas as pd
  12. log = logger()
  13. class MysqlUtils:
  14. _quchen_text = None
  15. _zx=None
  16. def __init__(self):
  17. p_path =os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))
  18. path = os.path.join(p_path,"config", "db_config.yaml")
  19. f = open(path, encoding="utf-8")
  20. self.config = yaml.load(f.read(), Loader=yaml.FullLoader)
  21. @property
  22. def quchen_text(self):
  23. conf = self.config['quchen_text']
  24. self._quchen_text = MysqlOperation(host=conf['host'],
  25. user=conf['user'],
  26. passwd=conf['passwd'],
  27. db=conf['db'])
  28. return self._quchen_text
  29. @property
  30. def zx(self):
  31. conf = self.config['zx']
  32. self._zx = MysqlOperation(host=conf['host'],
  33. user=conf['user'],
  34. passwd=conf['passwd'],
  35. db=conf['db'])
  36. return self._zx
  37. def find_db(self, db):
  38. if db == "quchen_text":
  39. self._quchen_text = self._quchen_text
  40. return self._quchen_text
  41. else:
  42. log.debug("输入数据库有误")
  43. def close(self):
  44. if self._quchen_text:
  45. self._quchen_text.cursor.close()
  46. self._quchen_text.conn.close()
  47. if self._zx:
  48. self._zx.cursor.close()
  49. self._zx.conn.close()
  50. class CkUtils:
  51. def __init__(self):
  52. self.client = Client(host='cc-bp1h3yc7o3g3o7k64o.ads.aliyuncs.com',
  53. user='qucheng_ck',
  54. password='Qc123456',
  55. port='3306',
  56. send_receive_timeout=5)
  57. def execute(self, sql):
  58. return self.client.execute(sql)
  59. def getData_pd(self, sql, col):
  60. """
  61. :param sql:
  62. :param col: [[]]
  63. :return:
  64. """
  65. data = self.execute(sql)
  66. df = pd.DataFrame(data, columns=col)
  67. return df
  68. def getColumns(self, table, is_list=False):
  69. """默认返回列表"""
  70. data = self.execute("desc " + table)
  71. li = []
  72. str = ''
  73. for i in data:
  74. li.append(i[0])
  75. str += i[0] + ','
  76. if is_list:
  77. return li
  78. else:
  79. return str[:-1]
  80. def insertMany(self,table,col,data):
  81. """
  82. :param table: 表名 srt
  83. :param col: 字段名 srt eg: ”a,b,c“
  84. :param data: tuple/list
  85. :return:
  86. """
  87. max=200
  88. sql="insert into {} ({}) values ".format(table,col)
  89. if len(data) == 0:
  90. log.debug("data.len==0")
  91. return
  92. if len(data) <= max:
  93. sql = sql+str(data)[1:-1]
  94. # log.info(sql)
  95. # log.info("insert {} rows".format(len(data)))
  96. self.execute(sql)
  97. return
  98. else:
  99. sql2=sql+str(data[:max])[1:-1]
  100. # log.info(sql2)
  101. self.execute(sql2)
  102. # log.info("insert {} rows".format(max))
  103. self.insertMany(table,col,data[max:])
  104. if __name__ == '__main__':
  105. p_path = os.path.dirname(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)))
  106. print(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)))