DataBaseOperation.py 22 KB


  1. """
  2. @desc 数据库操作方法封装
  3. @auth chenkai
  4. @date 2020/11/19
  5. @py_version py3.6
  6. """
  7. import pymysql
  8. import logging as log
  9. import pandas as pd
  10. import time
  11. from model.log import logger
  12. log = logger()
  13. pd.set_option('display.max_columns', None)
  14. pd.set_option('display.width', 1000)
  15. MYSQL_DEBUG = 1
  16. class MysqlOperation:
  17. def __init__(self, host, user, passwd, db, port=3306):
  18. try:
  19. self.conn = pymysql.connect(host=host,
  20. user=user,
  21. passwd=passwd,
  22. db=db,
  23. charset='utf8mb4',
  24. port=port)
  25. self.cursor = self.conn.cursor()
  26. except Exception as e:
  27. log.info(e)
  28. def set_dict_cursor(self):
  29. """
  30. 设置字典形式取数据
  31. """
  32. self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)
  33. def getData(self, sql, args=None):
  34. """
  35. :param sql:
  36. :param args:
  37. :return: tuple(tuple)
  38. """
  39. start = time.time()
  40. self.cursor.execute(sql, args=args)
  41. result = self.cursor.fetchall()
  42. if MYSQL_DEBUG:
  43. sql_str = sql % tuple(args) if args else sql
  44. log.info('sql: \n' + sql_str)
  45. log.info('sql cost: %s' % (time.time() - start))
  46. return result
  47. def get_data_list(self,sql,arg=None):
  48. """
  49. :param sql:
  50. :param arg:
  51. :return: list[list]
  52. """
  53. data=self.getData(sql,arg)
  54. li=[]
  55. for i in data:
  56. li.append(list(i))
  57. return li
  58. def getDataOneList(self,sql):
  59. """获取一列"""
  60. data = self.getData(sql)
  61. li = []
  62. for i in data:
  63. li.append(i[0])
  64. return li
  65. def execute(self, sql,data=None):
  66. start = time.time()
  67. if data:
  68. k=self.cursor.execute(sql,data)
  69. else:
  70. k=self.cursor.execute(sql)
  71. self.conn.commit()
  72. # if MYSQL_DEBUG:
  73. #
  74. # # log.info('sql: \n' + sql)
  75. # log.info('sql cost: %s' % (time.time() - start))
  76. print(f"affect rows :{k}")
  77. def executeMany(self,sql,data):
  78. start = time.time()
  79. k=self.cursor.executemany(sql,data)
  80. self.conn.commit()
  81. # if MYSQL_DEBUG:
  82. # log.info('sql: \n' + sql)
  83. # log.info('sql cost: %s' % (time.time() - start))
  84. print(f"\033[1;36maffect rows :{k} \033[0m")
  85. def getOne(self,sql, args=None):
  86. result = self.getData(sql, args)
  87. return result[0][0]
  88. def getData_pd(self, sql, args=None):
  89. start = time.time()
  90. # if args:
  91. # log.debug(sql % tuple(args))
  92. # else:
  93. # log.debug(sql)
  94. self.cursor.execute(sql, args=args)
  95. num_fields = len(self.cursor.description)
  96. field_names = [i[0] for i in self.cursor.description]
  97. df = self.cursor.fetchall()
  98. df = pd.DataFrame(data=list(df), columns=field_names)
  99. if MYSQL_DEBUG:
  100. sql_str = sql % tuple(args) if args else sql
  101. log.info('sql: \n' + sql_str)
  102. log.info('sql cost: %s' % (time.time() - start))
  103. return df
  104. # def insertData(self, sql, args=None):
  105. # # if args:
  106. # # log.debug(sql % tuple(args))
  107. # # else:
  108. # # log.debug(sql)
  109. # start = time.time()
  110. # self.cursor.execute(sql, args=args)
  111. #
  112. # if MYSQL_DEBUG:
  113. # sql_str = sql % tuple(args) if args else sql
  114. # log.info('sql: \n' + sql_str)
  115. # log.info('sql cost: %s' % (time.time() - start))
  116. # self.conn.commit()
  117. def executeWithoutCommit(self, sql, args=None):
  118. return self.cursor.execute(sql, args=args)
  119. def commit(self):
  120. self.conn.commit()
  121. def insertorupdate(self, table, keys, tags, tagvalue, flag, *args):
  122. """
  123. :param table: 表名
  124. :param keys: 联合主键名元组
  125. :param tags: 字段名元组
  126. :param tagvalue: 字段值
  127. :param args: 主键值
  128. :param flag: 控制是否打印日志
  129. :return:
  130. """
  131. # log.info(tags)
  132. sql = "INSERT INTO " + table + " ("
  133. sql += ",".join(keys) + ","
  134. sql += ",".join(tags)
  135. sql += ") SELECT "
  136. sql += "%s," * len(keys)
  137. sql += ("%s," * len(tags))[:-1]
  138. sql += " FROM DUAL WHERE NOT EXISTS (SELECT id FROM " + table
  139. sql += " WHERE "
  140. for _ in keys:
  141. sql += _ + "=%s AND "
  142. sql = sql[:-4]
  143. sql += "LIMIT 1)"
  144. arg = list(args)
  145. arg.extend(tagvalue)
  146. arg.extend(list(args))
  147. rows = self.cursor.execute(sql, args=arg)
  148. if rows == 0:
  149. sql = "UPDATE " + table + " SET "
  150. for _ in tags:
  151. sql += _ + "=%s,"
  152. sql = sql[:-1]
  153. sql += " WHERE "
  154. for _ in keys:
  155. sql += _ + "=%s AND "
  156. sql = sql[:-4]
  157. arg = []
  158. arg.extend(tagvalue)
  159. arg.extend(list(args))
  160. self.cursor.execute(sql, args=arg)
  161. if flag:
  162. log.info(sql % tuple(arg))
  163. self.conn.commit()
  164. def _insertorupdate(self, table, keys, tags, tag_value, flag, key_value, update=False):
  165. if not update:
  166. sql = "INSERT INTO " + table + " ("
  167. sql += ",".join(keys) + ","
  168. sql += ",".join(tags)
  169. sql += ") SELECT "
  170. sql += "%s," * len(keys)
  171. sql += ("%s," * len(tags))[:-1]
  172. sql += " FROM DUAL WHERE NOT EXISTS (SELECT id FROM " + table
  173. sql += " WHERE "
  174. for _ in keys:
  175. sql += _ + "=%s AND "
  176. sql = sql[:-4]
  177. sql += "LIMIT 1)"
  178. arg = list(key_value)
  179. arg.extend(tag_value)
  180. arg.extend(list(key_value))
  181. rows = self.cursor.execute(sql, args=arg)
  182. if rows == 0:
  183. sql = "UPDATE " + table + " SET "
  184. for _ in tags:
  185. sql += _ + "=%s,"
  186. sql = sql[:-1]
  187. sql += " WHERE "
  188. for _ in keys:
  189. sql += _ + "=%s AND "
  190. sql = sql[:-4]
  191. arg = []
  192. arg.extend(tag_value)
  193. arg.extend(list(key_value))
  194. self.cursor.execute(sql, args=arg)
  195. if flag:
  196. log.info(sql % tuple(arg))
  197. else:
  198. sql = "UPDATE " + table + " SET "
  199. for _ in tags:
  200. sql += _ + "=%s,"
  201. sql = sql[:-1]
  202. sql += " WHERE "
  203. for _ in keys:
  204. sql += _ + "=%s AND "
  205. sql = sql[:-4]
  206. arg = []
  207. arg.extend(tag_value)
  208. arg.extend(list(key_value))
  209. self.cursor.execute(sql, args=arg)
  210. if flag:
  211. log.info(sql % tuple(arg))
  212. def _insert_on_duplicate(self, table, keys, tags, tag_value, flag, key_value):
  213. name_all = list(keys)
  214. name_all.extend(tags)
  215. arg = list(key_value)
  216. arg.extend(tag_value)
  217. arg.extend(tag_value)
  218. sql_name = '(' + ','.join(name_all) + ')'
  219. sql_value = '(' + ','.join(['%s'] * len(name_all)) + ')'
  220. sql_update = ','.join([_ + '=%s' for _ in tags])
  221. sql = """
  222. insert into %s
  223. %s
  224. VALUES %s
  225. ON duplicate key UPDATE %s
  226. """ % (table, sql_name, sql_value, sql_update)
  227. self.cursor.execute(sql, args=arg)
  228. if flag:
  229. log.debug(sql % tuple(arg))
  230. def insertorupdatemany(self, table, keys, tags, tag_values, key_values, flag=False, unique_key=False, update=False):
  231. """
  232. :param table: 表名
  233. :param keys: 联合主键名元组
  234. :param tags: 字段名元组
  235. :param tag_values: 字段值组(list or pd.DataFrame)
  236. :param key_values: 主键值组(list or pd.DataFrame)
  237. :param flag: 控制是否打印日志
  238. :param unique_key: keys 是否为table的 unique_key
  239. :return:
  240. ps: 效率(外网): rows / 50; 1000以上更新使用
  241. """
  242. if isinstance(tag_values, pd.DataFrame):
  243. list_tag_value = [list(tag_values.iloc[_, :]) for _ in range(len(tag_values))]
  244. else:
  245. list_tag_value = list(tag_values)
  246. if isinstance(key_values, pd.DataFrame):
  247. list_key_value = [list(key_values.iloc[_, :]) for _ in range(len(key_values))]
  248. else:
  249. list_key_value = list(key_values)
  250. for _ in range(len(list_tag_value)):
  251. tag_value = list_tag_value[_]
  252. key_value = list_key_value[_]
  253. if unique_key:
  254. self._insert_on_duplicate(table, keys, tags, tag_value, flag, key_value)
  255. else:
  256. self._insertorupdate(table, keys, tags, tag_value, flag, key_value, update)
  257. self.conn.commit()
  258. def _check_repeat_key(self, key_list):
  259. tmp = list(map(lambda x: tuple(x), key_list))
  260. if len(tmp) == len(set(tmp)):
  261. return False
  262. else:
  263. last_data = -1
  264. repeat_key = set()
  265. for i in sorted(tmp):
  266. if last_data == i:
  267. repeat_key.add(i)
  268. if len(repeat_key) >= 10:
  269. break
  270. last_data = i
  271. log.error('Reject repeated keys')
  272. log.error('repeat_key: %s' % repeat_key)
  273. return True
  274. def _convert_to_list(self, data):
  275. if isinstance(data, pd.DataFrame):
  276. # np.nan != np.nan 从而判断值为np.nan
  277. list_data = [map(lambda x: None if x != x else x, list(data.iloc[_, :])) for _ in range(len(data))]
  278. li =[]
  279. for i in list_data:
  280. li.append(list(i))
  281. list_data = li
  282. else:
  283. list_data = list(data)
  284. return list_data
  285. def _get_exist_keys_index(self, table, keys, key_values, flag=False):
  286. list_sql_when = []
  287. list_tmp = []
  288. for i in range(len(key_values)):
  289. sql_when = """when (%s)=(%s) then %s""" % (','.join(keys), ','.join(['%s'] * len(key_values[i])), i)
  290. list_sql_when.append(sql_when)
  291. list_tmp.extend(key_values[i])
  292. list_sql_condition = []
  293. for i in range(len(key_values)):
  294. # sql_condition_old = """(%s)=(%s)""" % (','.join(keys), ','.join(['%s'] * len(key_values[i])))
  295. row_condition_list = map(lambda x: '%s = %%s' % x, keys)
  296. sql_condition = """(%s)""" % ' and '.join(row_condition_list)
  297. # print sql_condition_old, sql_condition
  298. list_sql_condition.append(sql_condition)
  299. list_tmp.extend(key_values[i])
  300. sql_where = ' or '.join(list_sql_condition)
  301. sql_case = '\n'.join(list_sql_when)
  302. sql = """
  303. select
  304. case
  305. %s
  306. end
  307. from %s
  308. where %s
  309. """ % (sql_case, table, sql_where)
  310. if flag:
  311. log.info(sql % tuple(list_tmp))
  312. self.cursor.execute(sql, tuple(list_tmp))
  313. print()
  314. result = self.cursor.fetchall()
  315. return map(lambda x: x[0], result)
  316. def insertorupdatemany_v2(self, table, keys, tags, tag_values, key_values, flag=False, split=80):
  317. """
  318. 更新插入多条数据(无key时自动插入, 有keys时更新)
  319. :param table: 表名
  320. :param keys: 联合主键名元组
  321. :param tags: 字段名元组
  322. :param tag_values: 字段值组(list or pd.DataFrame)
  323. :param key_values: 主键值组(list or pd.DataFrame)
  324. :param flag: 控制是否打印日志
  325. :param split: 切割阈值
  326. :return:
  327. ps: 效率(外网): rows^2 / 50000; rows以split为单位分批更新
  328. """
  329. if not isinstance(tag_values, (tuple, list, pd.DataFrame)):
  330. log.error('Type Error')
  331. exit(-1)
  332. return
  333. if len(tag_values) > split:
  334. length = len(tag_values)
  335. for i in range(0, length, split):
  336. start, finish = i, i + split
  337. self.insertorupdatemany_v2(table, keys, tags, tag_values[start:finish], key_values[start:finish], flag, split=split)
  338. return
  339. if len(key_values) == 0 or len(tag_values) == 0:
  340. log.debug('insert or update 0 rows')
  341. return
  342. tag_values = self._convert_to_list(tag_values)
  343. key_values = self._convert_to_list(key_values)
  344. assert self._check_repeat_key(key_values) == False
  345. exist_key_index = list(self._get_exist_keys_index(table, keys, key_values, flag))
  346. new_key_index = list(set(range(len(key_values))) - set(exist_key_index))
  347. update_keys = list(map(lambda x: key_values[x], exist_key_index))
  348. update_tags = list(map(lambda x: tag_values[x], exist_key_index))
  349. insert_keys = list(map(lambda x: key_values[x], new_key_index))
  350. insert_tags = list(map(lambda x: tag_values[x], new_key_index))
  351. self.insert_many(table=table,
  352. keys=keys,
  353. tags=tags,
  354. tag_values=insert_tags,
  355. key_values=insert_keys,
  356. flag=flag)
  357. self.update_many(table=table,
  358. keys=keys,
  359. tags=tags,
  360. tag_values=update_tags,
  361. key_values=update_keys,
  362. flag=flag,
  363. split=split)
  364. def insertorupdatemany_v3(self, df, table, keys, tags, flag=False, split=80):
  365. self.insertorupdatemany_v2(
  366. table=table,
  367. keys=keys,
  368. tags=tags,
  369. tag_values=df[tags],
  370. key_values=df[keys],
  371. flag=flag,
  372. split=split
  373. )
  374. def dfsave2mysql(self, df, table, key, tag):
  375. self.insertorupdatemany_v2(
  376. table=table,
  377. keys=key,
  378. tags=tag,
  379. tag_values=df[tag],
  380. key_values=df[key]
  381. )
  382. def _get_s_format(self, data):
  383. """
  384. Args:
  385. data: [[featureA1, featureB1, ...], [featureA2, featureB2, ...], ...]
  386. Returns:
  387. format of %s and real value
  388. Example:
  389. [['2017-07-01', 78], ['2017-07-01', 1]] ->
  390. ('((%s, %s), (%s, %s))', ['2017-07-01', 78, '2017-07-01', 1])
  391. """
  392. list_tmp_s = []
  393. values = []
  394. for _ in data:
  395. tmp_s = ','.join(len(_) * ['%s'])
  396. values.extend(_)
  397. if len(_) > 1:
  398. tmp_s = '(' + tmp_s + ')'
  399. list_tmp_s.append(tmp_s)
  400. format_s = '(' + ','.join(list_tmp_s) + ')'
  401. return format_s, values
  402. def delete_by_key(self, table, keys, key_values, flag=False):
  403. """
  404. Args:
  405. table: 表名
  406. keys: 联合主键名元组
  407. key_values: 主键值组(list or pd.DataFrame)
  408. flag: 控制是否打印日志
  409. Examples:
  410. delete_by_key('table_test', keys=['date'], key_values=[['2017-07-01'], ['2017-07-02']], flag=False)
  411. delete_by_key('table_test', keys=['date'], key_values=['2017-07-01'], flag=False)
  412. """
  413. if len(key_values) == 0:
  414. return
  415. if not (isinstance(key_values[0], (list, tuple)) or isinstance(key_values, pd.DataFrame)):
  416. key_values_list = [key_values]
  417. else:
  418. key_values_list = self._convert_to_list(key_values)
  419. sql_keys = '(' + ','.join(keys) + ')'
  420. contact_s, values_s = self._get_s_format(key_values_list)
  421. sql_del = """
  422. delete from %s
  423. where %s in %s
  424. """ % (table, sql_keys, contact_s)
  425. if flag:
  426. log.debug(sql_del % tuple(values_s))
  427. self.cursor.execute(sql_del, tuple(values_s))
  428. self.conn.commit()
  429. def insert_many(self, table, keys, tags, tag_values, key_values, flag=False, split=80):
  430. """
  431. 直接插入多条数据
  432. :param table: 表名
  433. :param keys: 联合主键名元组
  434. :param tags: 字段名元组
  435. :param tag_values: 字段值组(list or pd.DataFrame)
  436. :param key_values: 主键值组(list or pd.DataFrame)
  437. :param flag: 控制是否打印日志
  438. :return:
  439. Examples: 参照 insertorupdatemany_v2
  440. insert into table
  441. (count_date, cid, tag1, tag2)
  442. values ('2017-01-01', 10, 1, 'a'), ('2017-01-02', 20, 2, 'b'), ...
  443. """
  444. if len(key_values) == 0 or len(tag_values) == 0:
  445. log.debug('insert 0 rows')
  446. return
  447. if len(tag_values) > split:
  448. length = len(tag_values)
  449. for i in range(0, length, split):
  450. start, finish = i, i + split
  451. self.insert_many(table, keys, tags, tag_values[start:finish], key_values[start:finish], flag, split=split)
  452. return
  453. tag_values = self._convert_to_list(tag_values)
  454. key_values = self._convert_to_list(key_values)
  455. feature_total = "(" + ",".join(keys + tags) + ")"
  456. tmp_s = "(" + ",".join(["%s"] * len(keys + tags)) + ")"
  457. tmp_s_concat = ",\n".join([tmp_s] * len(key_values))
  458. sql_insert = """
  459. Insert into %s
  460. %s
  461. values %s""" % (table, feature_total, tmp_s_concat)
  462. value_insert = []
  463. for _ in zip(key_values, tag_values):
  464. value_insert.extend(_[0] + _[1])
  465. if flag:
  466. log.debug(sql_insert % tuple(value_insert))
  467. t0 = time.time()
  468. self.cursor.execute(sql_insert,tuple(value_insert))
  469. log.debug('insert %s rows, cost: %s' % (len(key_values), round(time.time() - t0, 2)))
  470. self.conn.commit()
  471. def update_many(self, table, keys, tags, tag_values, key_values, flag=False, split=80):
  472. """
  473. 更新多条数据(无key时不会自动插入)
  474. :param table: 表名
  475. :param keys: 联合主键名元组
  476. :param tags: 字段名元组
  477. :param tag_values: 字段值组(list or pd.DataFrame)
  478. :param key_values: 主键值组(list or pd.DataFrame)
  479. :param flag: 控制是否打印日志
  480. :param split: 分批更新量
  481. :return:
  482. Examples: 参照 insertorupdatemany_v2
  483. # 单条 update sql tag1=1, tag2='a' 插入到 (count_date, cid) =('2017-01-01', 10)
  484. update table
  485. set tag1=1, tag2='a'
  486. where (count_date, cid) =('2017-01-01', 10)
  487. # 多条组合 update sql
  488. # tag1=1, tag2='a' 插入到 (count_date, cid) =('2017-01-01', 10);
  489. # tag1=1, tag2='a' 插入到 (count_date, cid) =('2017-01-01', 10);
  490. update table
  491. set tag1 = case
  492. when (count_date, cid)=('2017-01-01', 10) then 1
  493. when (count_date, cid)=('2017-01-02', 20) then 2
  494. ...
  495. ,
  496. tag_2 = case
  497. when (count_date, cid)=('2017-01-01', 10) then 'a'
  498. when (count_date, cid)=('2017-01-02', 20) then 'b'
  499. ...
  500. where (count_date, cid)=('2017-01-01', 10) or (count_date, cid)=('2017-01-02', 20) or ...
  501. """
  502. if len(tag_values) > split:
  503. length = len(tag_values)
  504. for i in range(0, length, split):
  505. start, finish = i, i + split
  506. self.update_many(table, keys, tags, tag_values[start:finish], key_values[start:finish], flag, split=split)
  507. return
  508. if len(key_values) == 0 or len(tag_values) == 0:
  509. log.debug('update 0 rows')
  510. return
  511. tag_values = self._convert_to_list(tag_values)
  512. key_values = self._convert_to_list(key_values)
  513. if self._check_repeat_key(key_values):
  514. return
  515. update_value = []
  516. sql_keys = ','.join(keys)
  517. if len(keys) > 1:
  518. sql_keys = '(' + sql_keys + ')'
  519. sql_key_values = ','.join(['%s'] * len(keys))
  520. if len(keys) > 1:
  521. sql_key_values = '(' + sql_key_values + ')'
  522. sql_set_list = []
  523. for i in range(len(tags)):
  524. sql_when_list = []
  525. for j in range(len(tag_values)):
  526. sql_when = """when %s=%s then %s """ % (sql_keys, sql_key_values, '%s')
  527. update_value.extend(key_values[j])
  528. update_value.append(tag_values[j][i])
  529. sql_when_list.append(sql_when)
  530. sql_when_concat = '\n\t'.join(sql_when_list)
  531. sql_set = """%s = case \n\t %s\n end""" % (tags[i], sql_when_concat)
  532. sql_set_list.append(sql_set)
  533. for _ in key_values:
  534. update_value.extend(_)
  535. sql_set_concat = ',\n'.join(sql_set_list)
  536. list_sql_condition = []
  537. for i in range(len(key_values)):
  538. row_condition_list = map(lambda x: '%s = %%s' % x, keys)
  539. sql_condition = """(%s)""" % ' and '.join(row_condition_list)
  540. list_sql_condition.append(sql_condition)
  541. sql_where = ' or '.join(list_sql_condition)
  542. # condition = ' or\n\t'.join([sql_keys + '=' + sql_key_values] * len(tag_values))
  543. # print condition
  544. sql = """update %s\n set %s\n where %s""" % (table, sql_set_concat, sql_where)
  545. if flag:
  546. log.info(sql % tuple(update_value))
  547. t0 = time.time()
  548. self.cursor.execute(sql, tuple(update_value))
  549. self.conn.commit()
  550. log.debug('update %s rows, cost: %s' % (len(key_values), round(time.time() - t0, 2)))
  551. def getColumn(self,table,flag=0):
  552. "获取表的所有列"
  553. sql="SELECT `COLUMN_NAME` FROM `INFORMATION_SCHEMA`.`COLUMNS` " \
  554. "WHERE `TABLE_NAME`='{}' ORDER BY ordinal_position".format(table)
  555. self.cursor.execute(sql)
  556. a= self.cursor.fetchall()
  557. str=''
  558. li=[]
  559. for i in a:
  560. str+=i[0]+','
  561. li.append(i[0])
  562. if flag:
  563. return li
  564. else:
  565. return str[:-1]