DataBaseOperation.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688
  1. """
  2. @desc 数据库操作方法封装
  3. @auth chenkai
  4. @date 2020/11/19
  5. @py_version py3.6
  6. """
  7. import pymysql
  8. import logging as log
  9. import pandas as pd
  10. import time
  11. pd.set_option('display.max_columns', None)
  12. pd.set_option('display.width', 1000)
  13. MYSQL_DEBUG = 1
  14. class MysqlOperation:
  15. def __init__(self, host, user, passwd, db, port=3306):
  16. try:
  17. self.conn = pymysql.connect(host=host,
  18. user=user,
  19. passwd=passwd,
  20. db=db,
  21. charset='utf8mb4',
  22. port=port)
  23. self.cursor = self.conn.cursor()
  24. except Exception as e:
  25. log.info(e)
  26. def set_dict_cursor(self):
  27. """
  28. 设置字典形式取数据
  29. """
  30. self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)
  31. def getData(self, sql, args=None):
  32. """
  33. :param sql:
  34. :param args:
  35. :return: tuple(tuple)
  36. """
  37. start = time.time()
  38. self.cursor.execute(sql, args=args)
  39. result = self.cursor.fetchall()
  40. if MYSQL_DEBUG:
  41. sql_str = sql % tuple(args) if args else sql
  42. log.info('sql: \n' + sql_str)
  43. log.info('sql cost: %s' % (time.time() - start))
  44. return result
  45. def get_data_list(self,sql,arg=None):
  46. """
  47. :param sql:
  48. :param arg:
  49. :return: list[list]
  50. """
  51. data=self.getData(sql,arg)
  52. li=[]
  53. for i in data:
  54. li.append(list(i))
  55. return li
  56. def getDataOneList(self,sql):
  57. """获取一列"""
  58. data = self.getData(sql)
  59. li = []
  60. for i in data:
  61. li.append(i[0])
  62. return li
  63. def execute(self, sql,data=None):
  64. start = time.time()
  65. if data:
  66. k=self.cursor.execute(sql,data)
  67. else:
  68. k=self.cursor.execute(sql)
  69. self.conn.commit()
  70. # if MYSQL_DEBUG:
  71. #
  72. # # log.info('sql: \n' + sql)
  73. # log.info('sql cost: %s' % (time.time() - start))
  74. log.info(f"affect rows :{k}")
  75. def executeMany(self,sql,data):
  76. start = time.time()
  77. k=self.cursor.executemany(sql,data)
  78. self.conn.commit()
  79. # if MYSQL_DEBUG:
  80. # log.info('sql: \n' + sql)
  81. # log.info('sql cost: %s' % (time.time() - start))
  82. log.info(f"\033[1;36maffect rows :{k} \033[0m")
  83. def getOne(self,sql, args=None):
  84. result = self.getData(sql, args)
  85. return result[0][0]
  86. def getData_pd(self, sql, args=None):
  87. start = time.time()
  88. # if args:
  89. # log.debug(sql % tuple(args))
  90. # else:
  91. # log.debug(sql)
  92. self.cursor.execute(sql, args=args)
  93. num_fields = len(self.cursor.description)
  94. field_names = [i[0] for i in self.cursor.description]
  95. df = self.cursor.fetchall()
  96. df = pd.DataFrame(data=list(df), columns=field_names)
  97. if MYSQL_DEBUG:
  98. sql_str = sql % tuple(args) if args else sql
  99. log.info('sql: \n' + sql_str)
  100. log.info('sql cost: %s' % (time.time() - start))
  101. return df
  102. # def insertData(self, sql, args=None):
  103. # # if args:
  104. # # log.debug(sql % tuple(args))
  105. # # else:
  106. # # log.debug(sql)
  107. # start = time.time()
  108. # self.cursor.execute(sql, args=args)
  109. #
  110. # if MYSQL_DEBUG:
  111. # sql_str = sql % tuple(args) if args else sql
  112. # log.info('sql: \n' + sql_str)
  113. # log.info('sql cost: %s' % (time.time() - start))
  114. # self.conn.commit()
  115. def executeWithoutCommit(self, sql, args=None):
  116. return self.cursor.execute(sql, args=args)
  117. def commit(self):
  118. self.conn.commit()
  119. def insertorupdate(self, table, keys, tags, tagvalue, flag, *args):
  120. """
  121. :param table: 表名
  122. :param keys: 联合主键名元组
  123. :param tags: 字段名元组
  124. :param tagvalue: 字段值
  125. :param args: 主键值
  126. :param flag: 控制是否打印日志
  127. :return:
  128. """
  129. # log.info(tags)
  130. sql = "INSERT INTO " + table + " ("
  131. sql += ",".join(keys) + ","
  132. sql += ",".join(tags)
  133. sql += ") SELECT "
  134. sql += "%s," * len(keys)
  135. sql += ("%s," * len(tags))[:-1]
  136. sql += " FROM DUAL WHERE NOT EXISTS (SELECT id FROM " + table
  137. sql += " WHERE "
  138. for _ in keys:
  139. sql += _ + "=%s AND "
  140. sql = sql[:-4]
  141. sql += "LIMIT 1)"
  142. arg = list(args)
  143. arg.extend(tagvalue)
  144. arg.extend(list(args))
  145. rows = self.cursor.execute(sql, args=arg)
  146. if rows == 0:
  147. sql = "UPDATE " + table + " SET "
  148. for _ in tags:
  149. sql += _ + "=%s,"
  150. sql = sql[:-1]
  151. sql += " WHERE "
  152. for _ in keys:
  153. sql += _ + "=%s AND "
  154. sql = sql[:-4]
  155. arg = []
  156. arg.extend(tagvalue)
  157. arg.extend(list(args))
  158. self.cursor.execute(sql, args=arg)
  159. if flag:
  160. log.info(sql % tuple(arg))
  161. self.conn.commit()
  162. def _insertorupdate(self, table, keys, tags, tag_value, flag, key_value, update=False):
  163. if not update:
  164. sql = "INSERT INTO " + table + " ("
  165. sql += ",".join(keys) + ","
  166. sql += ",".join(tags)
  167. sql += ") SELECT "
  168. sql += "%s," * len(keys)
  169. sql += ("%s," * len(tags))[:-1]
  170. sql += " FROM DUAL WHERE NOT EXISTS (SELECT id FROM " + table
  171. sql += " WHERE "
  172. for _ in keys:
  173. sql += _ + "=%s AND "
  174. sql = sql[:-4]
  175. sql += "LIMIT 1)"
  176. arg = list(key_value)
  177. arg.extend(tag_value)
  178. arg.extend(list(key_value))
  179. rows = self.cursor.execute(sql, args=arg)
  180. if rows == 0:
  181. sql = "UPDATE " + table + " SET "
  182. for _ in tags:
  183. sql += _ + "=%s,"
  184. sql = sql[:-1]
  185. sql += " WHERE "
  186. for _ in keys:
  187. sql += _ + "=%s AND "
  188. sql = sql[:-4]
  189. arg = []
  190. arg.extend(tag_value)
  191. arg.extend(list(key_value))
  192. self.cursor.execute(sql, args=arg)
  193. if flag:
  194. log.info(sql % tuple(arg))
  195. else:
  196. sql = "UPDATE " + table + " SET "
  197. for _ in tags:
  198. sql += _ + "=%s,"
  199. sql = sql[:-1]
  200. sql += " WHERE "
  201. for _ in keys:
  202. sql += _ + "=%s AND "
  203. sql = sql[:-4]
  204. arg = []
  205. arg.extend(tag_value)
  206. arg.extend(list(key_value))
  207. self.cursor.execute(sql, args=arg)
  208. if flag:
  209. log.info(sql % tuple(arg))
  210. def _insert_on_duplicate(self, table, keys, tags, tag_value, flag, key_value):
  211. name_all = list(keys)
  212. name_all.extend(tags)
  213. arg = list(key_value)
  214. arg.extend(tag_value)
  215. arg.extend(tag_value)
  216. sql_name = '(' + ','.join(name_all) + ')'
  217. sql_value = '(' + ','.join(['%s'] * len(name_all)) + ')'
  218. sql_update = ','.join([_ + '=%s' for _ in tags])
  219. sql = """
  220. insert into %s
  221. %s
  222. VALUES %s
  223. ON duplicate key UPDATE %s
  224. """ % (table, sql_name, sql_value, sql_update)
  225. self.cursor.execute(sql, args=arg)
  226. if flag:
  227. log.debug(sql % tuple(arg))
  228. def insertorupdatemany(self, table, keys, tags, tag_values, key_values, flag=False, unique_key=False, update=False):
  229. """
  230. :param table: 表名
  231. :param keys: 联合主键名元组
  232. :param tags: 字段名元组
  233. :param tag_values: 字段值组(list or pd.DataFrame)
  234. :param key_values: 主键值组(list or pd.DataFrame)
  235. :param flag: 控制是否打印日志
  236. :param unique_key: keys 是否为table的 unique_key
  237. :return:
  238. ps: 效率(外网): rows / 50; 1000以上更新使用
  239. """
  240. if isinstance(tag_values, pd.DataFrame):
  241. list_tag_value = [list(tag_values.iloc[_, :]) for _ in range(len(tag_values))]
  242. else:
  243. list_tag_value = list(tag_values)
  244. if isinstance(key_values, pd.DataFrame):
  245. list_key_value = [list(key_values.iloc[_, :]) for _ in range(len(key_values))]
  246. else:
  247. list_key_value = list(key_values)
  248. for _ in range(len(list_tag_value)):
  249. tag_value = list_tag_value[_]
  250. key_value = list_key_value[_]
  251. if unique_key:
  252. self._insert_on_duplicate(table, keys, tags, tag_value, flag, key_value)
  253. else:
  254. self._insertorupdate(table, keys, tags, tag_value, flag, key_value, update)
  255. self.conn.commit()
  256. def _check_repeat_key(self, key_list):
  257. tmp = list(map(lambda x: tuple(x), key_list))
  258. if len(tmp) == len(set(tmp)):
  259. return False
  260. else:
  261. last_data = -1
  262. repeat_key = set()
  263. for i in sorted(tmp,key=lambda x:str(x)):
  264. if last_data == i:
  265. repeat_key.add(i)
  266. if len(repeat_key) >= 10:
  267. break
  268. last_data = i
  269. log.error('Reject repeated keys')
  270. log.error('repeat_key: %s' % repeat_key)
  271. return True
  272. def _convert_to_list(self, data):
  273. if isinstance(data, pd.DataFrame):
  274. # np.nan != np.nan 从而判断值为np.nan
  275. list_data = [map(lambda x: None if x != x else x, list(data.iloc[_, :])) for _ in range(len(data))]
  276. li =[]
  277. for i in list_data:
  278. li.append(list(i))
  279. list_data = li
  280. else:
  281. list_data = list(data)
  282. return list_data
  283. def _get_exist_keys_index(self, table, keys, key_values, flag=False):
  284. list_sql_when = []
  285. list_tmp = []
  286. for i in range(len(key_values)):
  287. sql_when = """when (%s)=(%s) then %s""" % (','.join(keys), ','.join(['%s'] * len(key_values[i])), i)
  288. list_sql_when.append(sql_when)
  289. list_tmp.extend(key_values[i])
  290. list_sql_condition = []
  291. for i in range(len(key_values)):
  292. # sql_condition_old = """(%s)=(%s)""" % (','.join(keys), ','.join(['%s'] * len(key_values[i])))
  293. row_condition_list = map(lambda x: '%s = %%s' % x, keys)
  294. sql_condition = """(%s)""" % ' and '.join(row_condition_list)
  295. # log.info sql_condition_old, sql_condition
  296. list_sql_condition.append(sql_condition)
  297. list_tmp.extend(key_values[i])
  298. sql_where = ' or '.join(list_sql_condition)
  299. sql_case = '\n'.join(list_sql_when)
  300. sql = """
  301. select
  302. case
  303. %s
  304. end
  305. from %s
  306. where %s
  307. """ % (sql_case, table, sql_where)
  308. if flag:
  309. log.info(sql % tuple(list_tmp))
  310. self.cursor.execute(sql, tuple(list_tmp))
  311. result = self.cursor.fetchall()
  312. return map(lambda x: x[0], result)
  313. def insertorupdatemany_v2(self, table, keys, tags, tag_values, key_values, flag=False, split=80):
  314. """
  315. 更新插入多条数据(无key时自动插入, 有keys时更新)
  316. :param table: 表名
  317. :param keys: 联合主键名元组
  318. :param tags: 字段名元组
  319. :param tag_values: 字段值组(list or pd.DataFrame)
  320. :param key_values: 主键值组(list or pd.DataFrame)
  321. :param flag: 控制是否打印日志
  322. :param split: 切割阈值
  323. :return:
  324. ps: 效率(外网): rows^2 / 50000; rows以split为单位分批更新
  325. """
  326. if not isinstance(tag_values, (tuple, list, pd.DataFrame)):
  327. log.error('Type Error')
  328. exit(-1)
  329. return
  330. if len(tag_values) > split:
  331. length = len(tag_values)
  332. for i in range(0, length, split):
  333. start, finish = i, i + split
  334. self.insertorupdatemany_v2(table, keys, tags, tag_values[start:finish], key_values[start:finish], flag, split=split)
  335. return
  336. if len(key_values) == 0 or len(tag_values) == 0:
  337. log.debug('insert or update 0 rows')
  338. return
  339. tag_values = self._convert_to_list(tag_values)
  340. key_values = self._convert_to_list(key_values)
  341. assert self._check_repeat_key(key_values) == False
  342. exist_key_index = list(self._get_exist_keys_index(table, keys, key_values, flag))
  343. new_key_index = list(set(range(len(key_values))) - set(exist_key_index))
  344. update_keys = list(map(lambda x: key_values[x], exist_key_index))
  345. update_tags = list(map(lambda x: tag_values[x], exist_key_index))
  346. insert_keys = list(map(lambda x: key_values[x], new_key_index))
  347. insert_tags = list(map(lambda x: tag_values[x], new_key_index))
  348. self.insert_many(table=table,
  349. keys=keys,
  350. tags=tags,
  351. tag_values=insert_tags,
  352. key_values=insert_keys,
  353. flag=flag)
  354. self.update_many(table=table,
  355. keys=keys,
  356. tags=tags,
  357. tag_values=update_tags,
  358. key_values=update_keys,
  359. flag=flag,
  360. split=split)
  361. def updateManyV2(self, table, keys, tags, tag_values, key_values, flag=False, split=80):
  362. if not isinstance(tag_values, (tuple, list, pd.DataFrame)):
  363. log.error('Type Error')
  364. exit(-1)
  365. return
  366. if len(tag_values) > split:
  367. length = len(tag_values)
  368. for i in range(0, length, split):
  369. start, finish = i, i + split
  370. self.updateManyV2(table, keys, tags, tag_values[start:finish], key_values[start:finish], flag, split=split)
  371. return
  372. if len(key_values) == 0 or len(tag_values) == 0:
  373. log.info('update 0 rows')
  374. return
  375. tag_values = self._convert_to_list(tag_values)
  376. key_values = self._convert_to_list(key_values)
  377. assert self._check_repeat_key(key_values) == False
  378. exist_key_index = list(self._get_exist_keys_index(table, keys, key_values, flag))
  379. update_keys = list(map(lambda x: key_values[x], exist_key_index))
  380. update_tags = list(map(lambda x: tag_values[x], exist_key_index))
  381. self.update_many(table=table,
  382. keys=keys,
  383. tags=tags,
  384. tag_values=update_tags,
  385. key_values=update_keys,
  386. flag=flag,
  387. split=split)
  388. def dfUpdate2mysql(self, df, table, key, tag):
  389. self.updateManyV2(
  390. table=table,
  391. keys=key,
  392. tags=tag,
  393. tag_values=df[tag],
  394. key_values=df[key],split=1000
  395. )
  396. def insertorupdatemany_v3(self, df, table, keys, tags, flag=False, split=80):
  397. self.insertorupdatemany_v2(
  398. table=table,
  399. keys=keys,
  400. tags=tags,
  401. tag_values=df[tags],
  402. key_values=df[keys],
  403. flag=flag,
  404. split=split
  405. )
  406. def dfsave2mysql(self, df, table, key, tag):
  407. self.insertorupdatemany_v2(
  408. table=table,
  409. keys=key,
  410. tags=tag,
  411. tag_values=df[tag],
  412. key_values=df[key]
  413. )
  414. def _get_s_format(self, data):
  415. """
  416. Args:
  417. data: [[featureA1, featureB1, ...], [featureA2, featureB2, ...], ...]
  418. Returns:
  419. format of %s and real value
  420. Example:
  421. [['2017-07-01', 78], ['2017-07-01', 1]] ->
  422. ('((%s, %s), (%s, %s))', ['2017-07-01', 78, '2017-07-01', 1])
  423. """
  424. list_tmp_s = []
  425. values = []
  426. for _ in data:
  427. tmp_s = ','.join(len(_) * ['%s'])
  428. values.extend(_)
  429. if len(_) > 1:
  430. tmp_s = '(' + tmp_s + ')'
  431. list_tmp_s.append(tmp_s)
  432. format_s = '(' + ','.join(list_tmp_s) + ')'
  433. return format_s, values
  434. def delete_by_key(self, table, keys, key_values, flag=False):
  435. """
  436. Args:
  437. table: 表名
  438. keys: 联合主键名元组
  439. key_values: 主键值组(list or pd.DataFrame)
  440. flag: 控制是否打印日志
  441. Examples:
  442. delete_by_key('table_test', keys=['date'], key_values=[['2017-07-01'], ['2017-07-02']], flag=False)
  443. delete_by_key('table_test', keys=['date'], key_values=['2017-07-01'], flag=False)
  444. """
  445. if len(key_values) == 0:
  446. return
  447. if not (isinstance(key_values[0], (list, tuple)) or isinstance(key_values, pd.DataFrame)):
  448. key_values_list = [key_values]
  449. else:
  450. key_values_list = self._convert_to_list(key_values)
  451. sql_keys = '(' + ','.join(keys) + ')'
  452. contact_s, values_s = self._get_s_format(key_values_list)
  453. sql_del = """
  454. delete from %s
  455. where %s in %s
  456. """ % (table, sql_keys, contact_s)
  457. if flag:
  458. log.info(sql_del % tuple(values_s))
  459. self.cursor.execute(sql_del, tuple(values_s))
  460. self.conn.commit()
  461. def insert_many(self, table, keys, tags, tag_values, key_values, flag=False, split=80):
  462. """
  463. 直接插入多条数据
  464. :param table: 表名
  465. :param keys: 联合主键名元组
  466. :param tags: 字段名元组
  467. :param tag_values: 字段值组(list or pd.DataFrame)
  468. :param key_values: 主键值组(list or pd.DataFrame)
  469. :param flag: 控制是否打印日志
  470. :return:
  471. Examples: 参照 insertorupdatemany_v2
  472. insert into table
  473. (count_date, cid, tag1, tag2)
  474. values ('2017-01-01', 10, 1, 'a'), ('2017-01-02', 20, 2, 'b'), ...
  475. """
  476. if len(key_values) == 0 or len(tag_values) == 0:
  477. log.info('insert 0 rows')
  478. return
  479. if len(tag_values) > split:
  480. length = len(tag_values)
  481. for i in range(0, length, split):
  482. start, finish = i, i + split
  483. self.insert_many(table, keys, tags, tag_values[start:finish], key_values[start:finish], flag, split=split)
  484. return
  485. tag_values = self._convert_to_list(tag_values)
  486. key_values = self._convert_to_list(key_values)
  487. feature_total = "(" + ",".join(keys + tags) + ")"
  488. tmp_s = "(" + ",".join(["%s"] * len(keys + tags)) + ")"
  489. tmp_s_concat = ",\n".join([tmp_s] * len(key_values))
  490. sql_insert = """
  491. Insert into %s
  492. %s
  493. values %s""" % (table, feature_total, tmp_s_concat)
  494. value_insert = []
  495. for _ in zip(key_values, tag_values):
  496. value_insert.extend(_[0] + _[1])
  497. if flag:
  498. log.info(sql_insert % tuple(value_insert))
  499. t0 = time.time()
  500. self.cursor.execute(sql_insert,tuple(value_insert))
  501. log.info('insert %s rows, cost: %s' % (len(key_values), round(time.time() - t0, 2)))
  502. self.conn.commit()
  503. def update_many(self, table, keys, tags, tag_values, key_values, flag=False, split=80):
  504. """
  505. 更新多条数据(无key时不会自动插入)
  506. :param table: 表名
  507. :param keys: 联合主键名元组
  508. :param tags: 字段名元组
  509. :param tag_values: 字段值组(list or pd.DataFrame)
  510. :param key_values: 主键值组(list or pd.DataFrame)
  511. :param flag: 控制是否打印日志
  512. :param split: 分批更新量
  513. :return:
  514. Examples: 参照 insertorupdatemany_v2
  515. # 单条 update sql tag1=1, tag2='a' 插入到 (count_date, cid) =('2017-01-01', 10)
  516. update table
  517. set tag1=1, tag2='a'
  518. where (count_date, cid) =('2017-01-01', 10)
  519. # 多条组合 update sql
  520. # tag1=1, tag2='a' 插入到 (count_date, cid) =('2017-01-01', 10);
  521. # tag1=1, tag2='a' 插入到 (count_date, cid) =('2017-01-01', 10);
  522. update table
  523. set tag1 = case
  524. when (count_date, cid)=('2017-01-01', 10) then 1
  525. when (count_date, cid)=('2017-01-02', 20) then 2
  526. ...
  527. ,
  528. tag_2 = case
  529. when (count_date, cid)=('2017-01-01', 10) then 'a'
  530. when (count_date, cid)=('2017-01-02', 20) then 'b'
  531. ...
  532. where (count_date, cid)=('2017-01-01', 10) or (count_date, cid)=('2017-01-02', 20) or ...
  533. """
  534. if len(tag_values) > split:
  535. length = len(tag_values)
  536. for i in range(0, length, split):
  537. start, finish = i, i + split
  538. self.update_many(table, keys, tags, tag_values[start:finish], key_values[start:finish], flag, split=split)
  539. return
  540. if len(key_values) == 0 or len(tag_values) == 0:
  541. log.info('update 0 rows')
  542. return
  543. tag_values = self._convert_to_list(tag_values)
  544. key_values = self._convert_to_list(key_values)
  545. if self._check_repeat_key(key_values):
  546. return
  547. update_value = []
  548. sql_keys = ','.join(keys)
  549. if len(keys) > 1:
  550. sql_keys = '(' + sql_keys + ')'
  551. sql_key_values = ','.join(['%s'] * len(keys))
  552. if len(keys) > 1:
  553. sql_key_values = '(' + sql_key_values + ')'
  554. sql_set_list = []
  555. for i in range(len(tags)):
  556. sql_when_list = []
  557. for j in range(len(tag_values)):
  558. sql_when = """when %s=%s then %s """ % (sql_keys, sql_key_values, '%s')
  559. update_value.extend(key_values[j])
  560. update_value.append(tag_values[j][i])
  561. sql_when_list.append(sql_when)
  562. sql_when_concat = '\n\t'.join(sql_when_list)
  563. sql_set = """%s = case \n\t %s\n end""" % (tags[i], sql_when_concat)
  564. sql_set_list.append(sql_set)
  565. for _ in key_values:
  566. update_value.extend(_)
  567. sql_set_concat = ',\n'.join(sql_set_list)
  568. list_sql_condition = []
  569. for i in range(len(key_values)):
  570. row_condition_list = map(lambda x: '%s = %%s' % x, keys)
  571. sql_condition = """(%s)""" % ' and '.join(row_condition_list)
  572. list_sql_condition.append(sql_condition)
  573. sql_where = ' or '.join(list_sql_condition)
  574. # condition = ' or\n\t'.join([sql_keys + '=' + sql_key_values] * len(tag_values))
  575. # log.info condition
  576. sql = """update %s\n set %s\n where %s""" % (table, sql_set_concat, sql_where)
  577. if flag:
  578. log.info(sql % tuple(update_value))
  579. t0 = time.time()
  580. self.cursor.execute(sql, tuple(update_value))
  581. self.conn.commit()
  582. log.info('update %s rows, cost: %s' % (len(key_values), round(time.time() - t0, 2)))
  583. def getColumn(self,table,flag=0):
  584. "获取表的所有列"
  585. sql="SELECT `COLUMN_NAME` FROM `INFORMATION_SCHEMA`.`COLUMNS` " \
  586. "WHERE `TABLE_NAME`='{}' ORDER BY ordinal_position".format(table)
  587. self.cursor.execute(sql)
  588. a= self.cursor.fetchall()
  589. str=''
  590. li=[]
  591. for i in a:
  592. str+=i[0]+','
  593. li.append(i[0])
  594. if flag:
  595. return li
  596. else:
  597. return str[:-1]