DataBaseOperation.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686
  1. """
  2. @desc 数据库操作方法封装
  3. @auth chenkai
  4. @date 2020/11/19
  5. @py_version py3.6
  6. """
  7. import pymysql
  8. import logging as log
  9. import pandas as pd
  10. import time
  11. pd.set_option('display.max_columns', None)
  12. pd.set_option('display.width', 1000)
  13. MYSQL_DEBUG = 1
  14. class MysqlOperation:
  15. def __init__(self, host, user, passwd, db, port=3306):
  16. #TODO:进行修改,不可以使用try catch
  17. try:
  18. self.conn = pymysql.connect(host=host,
  19. user=user,
  20. passwd=passwd,
  21. db=db,
  22. charset='utf8mb4',
  23. port=port)
  24. self.cursor = self.conn.cursor()
  25. except Exception as e:
  26. log.info(e)
  27. def set_dict_cursor(self):
  28. """
  29. 设置字典形式取数据
  30. """
  31. self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)
  32. def getData(self, sql, args=None):
  33. """
  34. :param sql:
  35. :param args:
  36. :return: tuple(tuple)
  37. """
  38. start = time.time()
  39. self.cursor.execute(sql, args=args)
  40. result = self.cursor.fetchall()
  41. if MYSQL_DEBUG:
  42. sql_str = sql % tuple(args) if args else sql
  43. log.info('sql: \n' + sql_str)
  44. log.info('sql cost: %s' % (time.time() - start))
  45. return result
  46. def get_data_list(self, sql, arg=None):
  47. """
  48. :param sql:
  49. :param arg:
  50. :return: list[list]
  51. """
  52. data = self.getData(sql, arg)
  53. li = []
  54. for i in data:
  55. li.append(list(i))
  56. return li
  57. def getDataOneList(self, sql):
  58. """获取一列"""
  59. data = self.getData(sql)
  60. li = []
  61. for i in data:
  62. li.append(i[0])
  63. return li
  64. def execute(self, sql, data=None):
  65. start = time.time()
  66. if data:
  67. k = self.cursor.execute(sql, data)
  68. else:
  69. k = self.cursor.execute(sql)
  70. self.conn.commit()
  71. # if MYSQL_DEBUG:
  72. #
  73. # # log.info('sql: \n' + sql)
  74. # log.info('sql cost: %s' % (time.time() - start))
  75. log.info(f"affect rows :{k}")
  76. def executeMany(self, sql, data):
  77. start = time.time()
  78. k = self.cursor.executemany(sql, data)
  79. self.conn.commit()
  80. # if MYSQL_DEBUG:
  81. # log.info('sql: \n' + sql)
  82. # log.info('sql cost: %s' % (time.time() - start))
  83. log.info(f"\033[1;36maffect rows :{k} \033[0m")
  84. def getOne(self, sql, args=None):
  85. result = self.getData(sql, args)
  86. return result[0][0]
  87. def getData_pd(self, sql, args=None):
  88. start = time.time()
  89. # if args:
  90. # log.debug(sql % tuple(args))
  91. # else:
  92. # log.debug(sql)
  93. self.cursor.execute(sql, args=args)
  94. num_fields = len(self.cursor.description)
  95. field_names = [i[0] for i in self.cursor.description]
  96. df = self.cursor.fetchall()
  97. df = pd.DataFrame(data=list(df), columns=field_names)
  98. if MYSQL_DEBUG:
  99. sql_str = sql % tuple(args) if args else sql
  100. log.info('sql: \n' + sql_str)
  101. log.info('sql cost: %s' % (time.time() - start))
  102. return df
  103. def pd_data_sql(self, sql):
  104. df = pd.read_sql(sql=sql, con=self.conn)
  105. for index, df_type in df.dtypes.items():
  106. if df_type != 'object':
  107. df[index] = df[index].astype('object')
  108. return df
  109. # def insertData(self, sql, args=None):
  110. # # if args:
  111. # # log.debug(sql % tuple(args))
  112. # # else:
  113. # # log.debug(sql)
  114. # start = time.time()
  115. # self.cursor.execute(sql, args=args)
  116. #
  117. # if MYSQL_DEBUG:
  118. # sql_str = sql % tuple(args) if args else sql
  119. # log.info('sql: \n' + sql_str)
  120. # log.info('sql cost: %s' % (time.time() - start))
  121. # self.conn.commit()
  122. def executeWithoutCommit(self, sql, args=None):
  123. return self.cursor.execute(sql, args=args)
  124. def commit(self):
  125. self.conn.commit()
  126. def insertorupdate(self, table, keys, tags, tagvalue, flag, *args):
  127. """
  128. :param table: 表名
  129. :param keys: 联合主键名元组
  130. :param tags: 字段名元组
  131. :param tagvalue: 字段值
  132. :param args: 主键值
  133. :param flag: 控制是否打印日志
  134. :return:
  135. """
  136. # log.info(tags)
  137. sql = "INSERT INTO " + table + " ("
  138. sql += ",".join(keys) + ","
  139. sql += ",".join(tags)
  140. sql += ") SELECT "
  141. sql += "%s," * len(keys)
  142. sql += ("%s," * len(tags))[:-1]
  143. sql += " FROM DUAL WHERE NOT EXISTS (SELECT id FROM " + table
  144. sql += " WHERE "
  145. for _ in keys:
  146. sql += _ + "=%s AND "
  147. sql = sql[:-4]
  148. sql += "LIMIT 1)"
  149. arg = list(args)
  150. arg.extend(tagvalue)
  151. arg.extend(list(args))
  152. rows = self.cursor.execute(sql, args=arg)
  153. if rows == 0:
  154. sql = "UPDATE " + table + " SET "
  155. for _ in tags:
  156. sql += _ + "=%s,"
  157. sql = sql[:-1]
  158. sql += " WHERE "
  159. for _ in keys:
  160. sql += _ + "=%s AND "
  161. sql = sql[:-4]
  162. arg = []
  163. arg.extend(tagvalue)
  164. arg.extend(list(args))
  165. self.cursor.execute(sql, args=arg)
  166. if flag:
  167. log.info(sql % tuple(arg))
  168. self.conn.commit()
  169. def _insertorupdate(self, table, keys, tags, tag_value, flag, key_value, update=False):
  170. if not update:
  171. sql = "INSERT INTO " + table + " ("
  172. sql += ",".join(keys) + ","
  173. sql += ",".join(tags)
  174. sql += ") SELECT "
  175. sql += "%s," * len(keys)
  176. sql += ("%s," * len(tags))[:-1]
  177. sql += " FROM DUAL WHERE NOT EXISTS (SELECT id FROM " + table
  178. sql += " WHERE "
  179. for _ in keys:
  180. sql += _ + "=%s AND "
  181. sql = sql[:-4]
  182. sql += "LIMIT 1)"
  183. arg = list(key_value)
  184. arg.extend(tag_value)
  185. arg.extend(list(key_value))
  186. rows = self.cursor.execute(sql, args=arg)
  187. if rows == 0:
  188. sql = "UPDATE " + table + " SET "
  189. for _ in tags:
  190. sql += _ + "=%s,"
  191. sql = sql[:-1]
  192. sql += " WHERE "
  193. for _ in keys:
  194. sql += _ + "=%s AND "
  195. sql = sql[:-4]
  196. arg = []
  197. arg.extend(tag_value)
  198. arg.extend(list(key_value))
  199. self.cursor.execute(sql, args=arg)
  200. if flag:
  201. log.info(sql % tuple(arg))
  202. else:
  203. sql = "UPDATE " + table + " SET "
  204. for _ in tags:
  205. sql += _ + "=%s,"
  206. sql = sql[:-1]
  207. sql += " WHERE "
  208. for _ in keys:
  209. sql += _ + "=%s AND "
  210. sql = sql[:-4]
  211. arg = []
  212. arg.extend(tag_value)
  213. arg.extend(list(key_value))
  214. self.cursor.execute(sql, args=arg)
  215. if flag:
  216. log.info(sql % tuple(arg))
  217. def _insert_on_duplicate(self, table, keys, tags, tag_value, flag, key_value):
  218. name_all = list(keys)
  219. name_all.extend(tags)
  220. arg = list(key_value)
  221. arg.extend(tag_value)
  222. arg.extend(tag_value)
  223. sql_name = '(' + ','.join(name_all) + ')'
  224. sql_value = '(' + ','.join(['%s'] * len(name_all)) + ')'
  225. sql_update = ','.join([_ + '=%s' for _ in tags])
  226. sql = """
  227. insert into %s
  228. %s
  229. VALUES %s
  230. ON duplicate key UPDATE %s
  231. """ % (table, sql_name, sql_value, sql_update)
  232. self.cursor.execute(sql, args=arg)
  233. if flag:
  234. log.debug(sql % tuple(arg))
  235. def insertorupdatemany(self, table, keys, tags, tag_values, key_values, flag=False, unique_key=False, update=False):
  236. """
  237. :param table: 表名
  238. :param keys: 联合主键名元组
  239. :param tags: 字段名元组
  240. :param tag_values: 字段值组(list or pd.DataFrame)
  241. :param key_values: 主键值组(list or pd.DataFrame)
  242. :param flag: 控制是否打印日志
  243. :param unique_key: keys 是否为table的 unique_key
  244. :return:
  245. ps: 效率(外网): rows / 50; 1000以上更新使用
  246. """
  247. if isinstance(tag_values, pd.DataFrame):
  248. list_tag_value = [list(tag_values.iloc[_, :]) for _ in range(len(tag_values))]
  249. else:
  250. list_tag_value = list(tag_values)
  251. if isinstance(key_values, pd.DataFrame):
  252. list_key_value = [list(key_values.iloc[_, :]) for _ in range(len(key_values))]
  253. else:
  254. list_key_value = list(key_values)
  255. for _ in range(len(list_tag_value)):
  256. tag_value = list_tag_value[_]
  257. key_value = list_key_value[_]
  258. if unique_key:
  259. self._insert_on_duplicate(table, keys, tags, tag_value, flag, key_value)
  260. else:
  261. self._insertorupdate(table, keys, tags, tag_value, flag, key_value, update)
  262. self.conn.commit()
  263. def _check_repeat_key(self, key_list):
  264. tmp = list(map(lambda x: tuple(x), key_list))
  265. if len(tmp) == len(set(tmp)):
  266. return False
  267. else:
  268. last_data = -1
  269. repeat_key = set()
  270. for i in sorted(tmp, key=lambda x: str(x)):
  271. if last_data == i:
  272. repeat_key.add(i)
  273. if len(repeat_key) >= 10:
  274. break
  275. last_data = i
  276. log.error('Reject repeated keys')
  277. log.error('repeat_key: %s' % repeat_key)
  278. return True
  279. def _convert_to_list(self, data):
  280. if isinstance(data, pd.DataFrame):
  281. # np.nan != np.nan 从而判断值为np.nan
  282. list_data = [map(lambda x: None if x != x else x, list(data.iloc[_, :])) for _ in range(len(data))]
  283. li = []
  284. for i in list_data:
  285. li.append(list(i))
  286. list_data = li
  287. else:
  288. list_data = list(data)
  289. return list_data
  290. def _get_exist_keys_index(self, table, keys, key_values, flag=False):
  291. list_sql_when = []
  292. list_tmp = []
  293. for i in range(len(key_values)):
  294. sql_when = """when (%s)=(%s) then %s""" % (','.join(keys), ','.join(['%s'] * len(key_values[i])), i)
  295. list_sql_when.append(sql_when)
  296. list_tmp.extend(key_values[i])
  297. list_sql_condition = []
  298. for i in range(len(key_values)):
  299. # sql_condition_old = """(%s)=(%s)""" % (','.join(keys), ','.join(['%s'] * len(key_values[i])))
  300. row_condition_list = map(lambda x: '%s = %%s' % x, keys)
  301. sql_condition = """(%s)""" % ' and '.join(row_condition_list)
  302. # log.info sql_condition_old, sql_condition
  303. list_sql_condition.append(sql_condition)
  304. list_tmp.extend(key_values[i])
  305. sql_where = ' or '.join(list_sql_condition)
  306. sql_case = '\n'.join(list_sql_when)
  307. sql = """
  308. select
  309. case
  310. %s
  311. end
  312. from %s
  313. where %s
  314. """ % (sql_case, table, sql_where)
  315. if flag:
  316. log.info(sql % tuple(list_tmp))
  317. self.cursor.execute(sql, tuple(list_tmp))
  318. result = self.cursor.fetchall()
  319. return map(lambda x: x[0], result)
  320. def insertorupdatemany_v2(self, table, keys, tags, tag_values, key_values, flag=False, split=80):
  321. """
  322. 更新插入多条数据(无key时自动插入, 有keys时更新)
  323. :param table: 表名
  324. :param keys: 联合主键名元组
  325. :param tags: 字段名元组
  326. :param tag_values: 字段值组(list or pd.DataFrame)
  327. :param key_values: 主键值组(list or pd.DataFrame)
  328. :param flag: 控制是否打印日志
  329. :param split: 切割阈值
  330. :return:
  331. ps: 效率(外网): rows^2 / 50000; rows以split为单位分批更新
  332. """
  333. if not isinstance(tag_values, (tuple, list, pd.DataFrame)):
  334. log.error('Type Error')
  335. exit(-1)
  336. return
  337. if len(tag_values) > split:
  338. length = len(tag_values)
  339. for i in range(0, length, split):
  340. start, finish = i, i + split
  341. self.insertorupdatemany_v2(table, keys, tags, tag_values[start:finish], key_values[start:finish], flag,
  342. split=split)
  343. return
  344. if len(key_values) == 0 or len(tag_values) == 0:
  345. log.debug('insert or update 0 rows')
  346. return
  347. tag_values = self._convert_to_list(tag_values)
  348. key_values = self._convert_to_list(key_values)
  349. assert self._check_repeat_key(key_values) == False
  350. exist_key_index = list(self._get_exist_keys_index(table, keys, key_values, flag))
  351. new_key_index = list(set(range(len(key_values))) - set(exist_key_index))
  352. update_keys = list(map(lambda x: key_values[x], exist_key_index))
  353. update_tags = list(map(lambda x: tag_values[x], exist_key_index))
  354. insert_keys = list(map(lambda x: key_values[x], new_key_index))
  355. insert_tags = list(map(lambda x: tag_values[x], new_key_index))
  356. self.insert_many(table=table,
  357. keys=keys,
  358. tags=tags,
  359. tag_values=insert_tags,
  360. key_values=insert_keys,
  361. flag=flag)
  362. self.update_many(table=table,
  363. keys=keys,
  364. tags=tags,
  365. tag_values=update_tags,
  366. key_values=update_keys,
  367. flag=flag,
  368. split=split)
  369. def updateManyV2(self, table, keys, tags, tag_values, key_values, flag=False, split=80):
  370. if not isinstance(tag_values, (tuple, list, pd.DataFrame)):
  371. log.error('Type Error')
  372. exit(-1)
  373. return
  374. if len(tag_values) > split:
  375. length = len(tag_values)
  376. for i in range(0, length, split):
  377. start, finish = i, i + split
  378. self.updateManyV2(table, keys, tags, tag_values[start:finish], key_values[start:finish], flag,
  379. split=split)
  380. return
  381. if len(key_values) == 0 or len(tag_values) == 0:
  382. log.info('update 0 rows')
  383. return
  384. tag_values = self._convert_to_list(tag_values)
  385. key_values = self._convert_to_list(key_values)
  386. assert self._check_repeat_key(key_values) == False
  387. exist_key_index = list(self._get_exist_keys_index(table, keys, key_values, flag))
  388. update_keys = list(map(lambda x: key_values[x], exist_key_index))
  389. update_tags = list(map(lambda x: tag_values[x], exist_key_index))
  390. self.update_many(table=table,
  391. keys=keys,
  392. tags=tags,
  393. tag_values=update_tags,
  394. key_values=update_keys,
  395. flag=flag,
  396. split=split)
  397. def dfUpdate2mysql(self, df, table, key, tag):
  398. self.updateManyV2(
  399. table=table,
  400. keys=key,
  401. tags=tag,
  402. tag_values=df[tag],
  403. key_values=df[key], split=1000
  404. )
  405. def insertorupdatemany_v3(self, df, table, keys, tags, flag=False, split=80):
  406. self.insertorupdatemany_v2(
  407. table=table,
  408. keys=keys,
  409. tags=tags,
  410. tag_values=df[tags],
  411. key_values=df[keys],
  412. flag=flag,
  413. split=split
  414. )
  415. def dfsave2mysql(self, df, table, key, tag):
  416. self.insertorupdatemany_v2(
  417. table=table,
  418. keys=key,
  419. tags=tag,
  420. tag_values=df[tag],
  421. key_values=df[key]
  422. )
  423. def _get_s_format(self, data):
  424. """
  425. Args:
  426. data: [[featureA1, featureB1, ...], [featureA2, featureB2, ...], ...]
  427. Returns:
  428. format of %s and real value
  429. Example:
  430. [['2017-07-01', 78], ['2017-07-01', 1]] ->
  431. ('((%s, %s), (%s, %s))', ['2017-07-01', 78, '2017-07-01', 1])
  432. """
  433. list_tmp_s = []
  434. values = []
  435. for _ in data:
  436. tmp_s = ','.join(len(_) * ['%s'])
  437. values.extend(_)
  438. if len(_) > 1:
  439. tmp_s = '(' + tmp_s + ')'
  440. list_tmp_s.append(tmp_s)
  441. format_s = '(' + ','.join(list_tmp_s) + ')'
  442. return format_s, values
  443. def delete_by_key(self, table, keys, key_values, flag=False):
  444. """
  445. Args:
  446. table: 表名
  447. keys: 联合主键名元组
  448. key_values: 主键值组(list or pd.DataFrame)
  449. flag: 控制是否打印日志
  450. Examples:
  451. delete_by_key('table_test', keys=['date'], key_values=[['2017-07-01'], ['2017-07-02']], flag=False)
  452. delete_by_key('table_test', keys=['date'], key_values=['2017-07-01'], flag=False)
  453. """
  454. if len(key_values) == 0:
  455. return
  456. if not (isinstance(key_values[0], (list, tuple)) or isinstance(key_values, pd.DataFrame)):
  457. key_values_list = [key_values]
  458. else:
  459. key_values_list = self._convert_to_list(key_values)
  460. sql_keys = '(' + ','.join(keys) + ')'
  461. contact_s, values_s = self._get_s_format(key_values_list)
  462. sql_del = """
  463. delete from %s
  464. where %s in %s
  465. """ % (table, sql_keys, contact_s)
  466. if flag:
  467. log.info(sql_del % tuple(values_s))
  468. self.cursor.execute(sql_del, tuple(values_s))
  469. self.conn.commit()
  470. def insert_many(self, table, keys, tags, tag_values, key_values, flag=False, split=80):
  471. """
  472. 直接插入多条数据
  473. :param table: 表名
  474. :param keys: 联合主键名元组
  475. :param tags: 字段名元组
  476. :param tag_values: 字段值组(list or pd.DataFrame)
  477. :param key_values: 主键值组(list or pd.DataFrame)
  478. :param flag: 控制是否打印日志
  479. :return:
  480. Examples: 参照 insertorupdatemany_v2
  481. insert into table
  482. (count_date, cid, tag1, tag2)
  483. values ('2017-01-01', 10, 1, 'a'), ('2017-01-02', 20, 2, 'b'), ...
  484. """
  485. if len(key_values) == 0 or len(tag_values) == 0:
  486. log.info('insert 0 rows')
  487. return
  488. if len(tag_values) > split:
  489. length = len(tag_values)
  490. for i in range(0, length, split):
  491. start, finish = i, i + split
  492. self.insert_many(table, keys, tags, tag_values[start:finish], key_values[start:finish], flag,
  493. split=split)
  494. return
  495. tag_values = self._convert_to_list(tag_values)
  496. key_values = self._convert_to_list(key_values)
  497. feature_total = "(" + ",".join(keys + tags) + ")"
  498. tmp_s = "(" + ",".join(["%s"] * len(keys + tags)) + ")"
  499. tmp_s_concat = ",\n".join([tmp_s] * len(key_values))
  500. sql_insert = """
  501. Insert into %s
  502. %s
  503. values %s""" % (table, feature_total, tmp_s_concat)
  504. value_insert = []
  505. for _ in zip(key_values, tag_values):
  506. value_insert.extend(_[0] + _[1])
  507. if flag:
  508. log.info(sql_insert % tuple(value_insert))
  509. t0 = time.time()
  510. self.cursor.execute(sql_insert, tuple(value_insert))
  511. log.info('insert %s rows, cost: %s' % (len(key_values), round(time.time() - t0, 2)))
  512. self.conn.commit()
  513. def update_many(self, table, keys, tags, tag_values, key_values, flag=False, split=80):
  514. """
  515. 更新多条数据(无key时不会自动插入)
  516. :param table: 表名
  517. :param keys: 联合主键名元组
  518. :param tags: 字段名元组
  519. :param tag_values: 字段值组(list or pd.DataFrame)
  520. :param key_values: 主键值组(list or pd.DataFrame)
  521. :param flag: 控制是否打印日志
  522. :param split: 分批更新量
  523. :return:
  524. Examples: 参照 insertorupdatemany_v2
  525. # 单条 update sql tag1=1, tag2='a' 插入到 (count_date, cid) =('2017-01-01', 10)
  526. update table
  527. set tag1=1, tag2='a'
  528. where (count_date, cid) =('2017-01-01', 10)
  529. # 多条组合 update sql
  530. # tag1=1, tag2='a' 插入到 (count_date, cid) =('2017-01-01', 10);
  531. # tag1=1, tag2='a' 插入到 (count_date, cid) =('2017-01-01', 10);
  532. update table
  533. set tag1 = case
  534. when (count_date, cid)=('2017-01-01', 10) then 1
  535. when (count_date, cid)=('2017-01-02', 20) then 2
  536. ...
  537. ,
  538. tag_2 = case
  539. when (count_date, cid)=('2017-01-01', 10) then 'a'
  540. when (count_date, cid)=('2017-01-02', 20) then 'b'
  541. ...
  542. where (count_date, cid)=('2017-01-01', 10) or (count_date, cid)=('2017-01-02', 20) or ...
  543. """
  544. if len(tag_values) > split:
  545. length = len(tag_values)
  546. for i in range(0, length, split):
  547. start, finish = i, i + split
  548. self.update_many(table, keys, tags, tag_values[start:finish], key_values[start:finish], flag,
  549. split=split)
  550. return
  551. if len(key_values) == 0 or len(tag_values) == 0:
  552. log.info('update 0 rows')
  553. return
  554. tag_values = self._convert_to_list(tag_values)
  555. key_values = self._convert_to_list(key_values)
  556. if self._check_repeat_key(key_values):
  557. return
  558. update_value = []
  559. sql_keys = ','.join(keys)
  560. if len(keys) > 1:
  561. sql_keys = '(' + sql_keys + ')'
  562. sql_key_values = ','.join(['%s'] * len(keys))
  563. if len(keys) > 1:
  564. sql_key_values = '(' + sql_key_values + ')'
  565. sql_set_list = []
  566. for i in range(len(tags)):
  567. sql_when_list = []
  568. for j in range(len(tag_values)):
  569. sql_when = """when %s=%s then %s """ % (sql_keys, sql_key_values, '%s')
  570. update_value.extend(key_values[j])
  571. update_value.append(tag_values[j][i])
  572. sql_when_list.append(sql_when)
  573. sql_when_concat = '\n\t'.join(sql_when_list)
  574. sql_set = """%s = case \n\t %s\n end""" % (tags[i], sql_when_concat)
  575. sql_set_list.append(sql_set)
  576. for _ in key_values:
  577. update_value.extend(_)
  578. sql_set_concat = ',\n'.join(sql_set_list)
  579. list_sql_condition = []
  580. for i in range(len(key_values)):
  581. row_condition_list = map(lambda x: '%s = %%s' % x, keys)
  582. sql_condition = """(%s)""" % ' and '.join(row_condition_list)
  583. list_sql_condition.append(sql_condition)
  584. sql_where = ' or '.join(list_sql_condition)
  585. # condition = ' or\n\t'.join([sql_keys + '=' + sql_key_values] * len(tag_values))
  586. # log.info condition
  587. sql = """update %s\n set %s\n where %s""" % (table, sql_set_concat, sql_where)
  588. if flag:
  589. log.info(sql % tuple(update_value))
  590. t0 = time.time()
  591. self.cursor.execute(sql, tuple(update_value))
  592. self.conn.commit()
  593. log.info('update %s rows, cost: %s' % (len(key_values), round(time.time() - t0, 2)))
  594. def getColumn(self, table, flag=0):
  595. "获取表的所有列"
  596. sql = "SELECT `COLUMN_NAME` FROM `INFORMATION_SCHEMA`.`COLUMNS` " \
  597. "WHERE `TABLE_NAME`='{}' ORDER BY ordinal_position".format(table)
  598. self.cursor.execute(sql)
  599. a = self.cursor.fetchall()
  600. str = ''
  601. li = []
  602. for i in a:
  603. str += i[0] + ','
  604. li.append(i[0])
  605. if flag:
  606. return li
  607. else:
  608. return str[:-1]