test_threadpool.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. __title__ = '每日凌晨空闲时检查本地数据库中的订单数据是否和平台昨天总订单一致'
  5. @Time : 2020/9/26 19:44
  6. @Author : Kenny-PC
  7. @Software: PyCharm
  8. # code is far away from bugs with the god animal protecting
  9. I love animals. They taste delicious.
  10. ┏┓ ┏┓
  11. ┏┛┻━━━┛┻┓
  12. ┃ ☃ ┃
  13. ┃ ┳┛ ┗┳ ┃
  14. ┃ ┻ ┃
  15. ┗━┓ ┏━┛
  16. ┃ ┗━━━┓
  17. ┃ 神兽保佑 ┣┓
  18. ┃ 永无BUG! ┏┛
  19. ┗┓┓┏━┳┓┏┛
  20. ┃┫┫ ┃┫┫
  21. ┗┻┛ ┗┻┛
  22. """
  23. import datetime
  24. import hashlib
  25. import math
  26. import time
  27. from concurrent.futures import ProcessPoolExecutor
  28. from urllib import parse
  29. import requests
  30. import random
  31. import account_list as al
  32. from MySQLConnection import MySQLConnection
  33. from util import date_util
  34. from util import platform_util
  35. def md5value(s):
  36. md5 = hashlib.md5()
  37. md5.update(s.encode("utf-8"))
  38. return md5.hexdigest()
  39. ##《1》阅文
  40. def get_yuewen_order(st, et):
  41. start_exec_seconds = date_util.getCurrentSecondTime()
  42. total_order = ()
  43. account_list = al.yuewen_account_list
  44. executor = ProcessPoolExecutor(max_workers=5)
  45. futures = []
  46. for account in account_list:
  47. future = executor.submit(get_yuewen_order_task, st, et, account)
  48. futures.append(future)
  49. executor.shutdown(True)
  50. for future in futures:
  51. if len(future.result()) > 0:
  52. total_order = future.result() + total_order
  53. print('阅文订单数量:', len(total_order), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
  54. return total_order
  55. def get_yuewen_order_task(st, et, account):
  56. order_list = ()
  57. url = 'https://open.yuewen.com/cpapi/wxRecharge/querychargelog'
  58. version = 1
  59. start_time = st
  60. email = account[0]
  61. appsecert = account[1]
  62. for i in range(int((et - st) / 86400)):
  63. end_time = min(start_time + 86400, et)
  64. timestamp = int(time.time())
  65. s = ''
  66. page = 1
  67. order_status = 2
  68. data = {
  69. 'email': email,
  70. 'version': version,
  71. 'timestamp': timestamp,
  72. 'start_time': start_time,
  73. 'end_time': end_time,
  74. 'page': page,
  75. 'order_status': order_status
  76. # 'last_min_id':last_min_id,
  77. # 'last_max_id':last_max_id,
  78. # 'total_count':total_count,
  79. # 'last_page':last_page
  80. }
  81. sorted_data = sorted(data.items())
  82. for k, v in sorted_data:
  83. s = s + str(k) + str(v)
  84. sign = md5value(appsecert + s).upper()
  85. data1 = {
  86. 'email': email,
  87. 'version': version,
  88. 'timestamp': timestamp,
  89. 'start_time': start_time,
  90. 'end_time': end_time,
  91. 'page': page,
  92. 'order_status': order_status,
  93. 'sign': sign
  94. }
  95. list1 = requests.get(url=url, params=data1)
  96. ## 此接口有调用频率限制,相同查询条件每分钟仅能请求一次
  97. ## exception: list1.json() {'code': 10408, 'msg': '调用频率超限'}
  98. if list1.json()['code'] != 0:
  99. print('阅文查询充值接口异常:', list1.json())
  100. break
  101. total_count = list1.json()['data']['total_count']
  102. last_min_id = list1.json()['data']['min_id']
  103. last_max_id = list1.json()['data']['max_id']
  104. last_page = list1.json()['data']['page']
  105. if total_count > 0:
  106. for x in list1.json()['data']['list']:
  107. y = {}
  108. dtime = datetime.datetime.strptime(x['order_time'], "%Y-%m-%d %H:%M:%S")
  109. y['date'] = ((int(time.mktime(dtime.timetuple())) + 8 * 3600) // 86400) * 86400 - 8 * 3600
  110. y['platform'] = '阅文'
  111. y['channel'] = x['app_name']
  112. y['from_novel'] = x['book_name']
  113. y['user_id'] = x['openid']
  114. y['stage'] = ''
  115. y['channel_id'] = 0
  116. y['order_time'] = x['order_time']
  117. y['amount'] = x['amount']
  118. y['reg_time'] = x['reg_time']
  119. y['order_id'] = x['order_id']
  120. y = sorted(y.items(), key=lambda item: item[0])
  121. y = dict(y)
  122. y = tuple(y.values())
  123. order_list = order_list + ((y),)
  124. if total_count > 100:
  125. page_while_count = math.ceil(total_count / 100) + 1
  126. if page_while_count > 2:
  127. sleep_seconds = random.randint(60, 70)
  128. print('阅文获取订单数据线程休眠', sleep_seconds,'秒,因为该接口有一分钟的限制')
  129. time.sleep(sleep_seconds)
  130. for page in range(2, page_while_count):
  131. timestamp = int(time.time())
  132. data = {
  133. 'email': email,
  134. 'version': version,
  135. 'timestamp': timestamp,
  136. 'start_time': start_time,
  137. 'end_time': end_time,
  138. 'page': page,
  139. 'last_min_id': last_min_id,
  140. 'last_max_id': last_max_id,
  141. 'total_count': total_count,
  142. 'last_page': last_page,
  143. 'order_status': order_status
  144. }
  145. sorted_data = sorted(data.items())
  146. s1 = ''
  147. for k, v in sorted_data:
  148. s1 = s1 + str(k) + str(v)
  149. sign = md5value(appsecert + s1).upper()
  150. data2 = {
  151. 'email': email,
  152. 'version': version,
  153. 'timestamp': timestamp,
  154. 'start_time': start_time,
  155. 'end_time': end_time,
  156. 'page': page,
  157. 'last_min_id': last_min_id,
  158. 'last_max_id': last_max_id,
  159. 'total_count': total_count,
  160. 'last_page': last_page,
  161. 'order_status': order_status,
  162. 'sign': sign
  163. }
  164. list2 = requests.get(url=url, params=data2)
  165. if list2.json()['code'] != 0:
  166. print('阅文查询充值接口异常:', list2.json(), timestamp, int(time.time()))
  167. break
  168. for x in list2.json()['data']['list']:
  169. y = {}
  170. dtime = datetime.datetime.strptime(x['order_time'], "%Y-%m-%d %H:%M:%S")
  171. y['date'] = ((int(time.mktime(dtime.timetuple())) + 8 * 3600) // 86400) * 86400 - 8 * 3600
  172. y['platform'] = '阅文'
  173. y['channel'] = x['app_name']
  174. y['from_novel'] = x['book_name']
  175. y['user_id'] = x['openid']
  176. y['stage'] = ''
  177. y['channel_id'] = 0
  178. y['order_time'] = x['order_time']
  179. y['amount'] = x['amount']
  180. y['reg_time'] = x['reg_time']
  181. y['order_id'] = x['order_id']
  182. y = sorted(y.items(), key=lambda item: item[0])
  183. y = dict(y)
  184. y = tuple(y.values())
  185. order_list = order_list + ((y),)
  186. total_count = list2.json()['data']['total_count']
  187. last_min_id = list2.json()['data']['min_id']
  188. last_max_id = list2.json()['data']['max_id']
  189. last_page = list2.json()['data']['page']
  190. start_time = start_time + 86400
  191. return order_list
  192. ##《2》掌读
  193. def get_zhangdu_order(st, et):
  194. start_exec_seconds = date_util.getCurrentSecondTime()
  195. total_order = ()
  196. account_list = al.zhangdu_account_list
  197. executor = ProcessPoolExecutor(max_workers=5)
  198. futures = []
  199. for account in account_list:
  200. future = executor.submit(get_zhangdu_order_task, st, et, account)
  201. futures.append(future)
  202. executor.shutdown(True)
  203. for future in futures:
  204. if len(future.result()) > 0:
  205. total_order = future.result() + total_order
  206. print('掌读订单数量:', len(total_order), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
  207. return total_order
  208. def get_zhangdu_order_task(st, et, account):
  209. order_list = ()
  210. url = 'https://api.zhangdu520.com/channel/getorder'
  211. uid = account[0]
  212. appsecert = account[1]
  213. channel = account[2]
  214. timestamp = int(time.time())
  215. sign = md5value(str(uid) + '&' + appsecert + '&' + str(timestamp))
  216. starttime = st
  217. timespace = 90 * 3600 * 24
  218. endtime = min(et, st + timespace)
  219. for x in range((et - st) // timespace + 1): # 分时段
  220. if x > 0:
  221. print('掌读跨天数查询:', x)
  222. params = {
  223. 'uid': uid,
  224. 'timestamp': timestamp,
  225. 'sign': sign,
  226. 'starttime': starttime,
  227. 'endtime': endtime
  228. }
  229. list1 = requests.get(url=url, params=params)
  230. pageCount = list1.json()['data']['pageCount']
  231. if pageCount == 0:
  232. continue
  233. for page in range(1, pageCount + 1): # 分页
  234. params = {
  235. 'uid': uid,
  236. 'timestamp': timestamp,
  237. 'sign': sign,
  238. 'starttime': starttime,
  239. 'endtime': endtime,
  240. 'page': page
  241. }
  242. list2 = requests.get(url=url, params=params).json()
  243. if 'data' in list2.keys():
  244. for b in list2['data']['list']:
  245. if b['status'] == '1':
  246. c = {}
  247. c['amount'] = b['amount']
  248. c['channel_id'] = uid
  249. c['order_id'] = str(b['orderno'])
  250. c['order_time'] = b['ctime']
  251. c['user_id'] = b['openid']
  252. c['platform'] = '掌读'
  253. c['channel'] = channel
  254. c['reg_time'] = b['regtime']
  255. c['from_novel'] = ''
  256. c['stage'] = ''
  257. c['date'] = ((int(b['ctime']) + 8 * 3600) // 86400) * 86400 - 8 * 3600
  258. x = sorted(c.items(), key=lambda item: item[0])
  259. x = dict(x)
  260. x = tuple(x.values())
  261. order_list = order_list + ((x),)
  262. starttime = starttime + timespace
  263. endtime = min(et, starttime + timespace)
  264. return order_list
  265. ##《3》花生
  266. def get_huasheng_order(st, et):
  267. start_exec_seconds = date_util.getCurrentSecondTime()
  268. total_order = ()
  269. account_list = al.huasheng_account_list
  270. executor = ProcessPoolExecutor(max_workers=5)
  271. futures = []
  272. for account in account_list:
  273. url = 'https://vip.rlcps.cn/api/getMerchants'
  274. apiKEY = account[0]
  275. apiSecurity = account[1]
  276. timestamp = str(int(time.time()))
  277. sign = md5value(apiKEY + timestamp + apiSecurity).upper()
  278. params = {
  279. 'apiKey': apiKEY,
  280. 'apiSecurity': apiSecurity,
  281. 'timestamp': timestamp,
  282. 'sign': sign
  283. }
  284. merchant_list = requests.post(url, params).json()
  285. for merchant in merchant_list['data']:
  286. future = executor.submit(get_huasheng_order_task, st, et, account, merchant)
  287. futures.append(future)
  288. executor.shutdown(True)
  289. for future in futures:
  290. if len(future.result()) > 0:
  291. total_order = future.result() + total_order
  292. print('花生订单数量:', len(total_order), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
  293. return total_order
  294. def get_huasheng_order_task(st, et, account, merchant):
  295. order_list = ()
  296. apiKEY = account[0]
  297. apiSecurity = account[1]
  298. stage = account[2]
  299. timestamp = str(int(time.time()))
  300. merchant_id = merchant['merchant_id']
  301. merchant_name = merchant['merchant_name']
  302. order_url = 'https://vip.rlcps.cn/api/orderList'
  303. start_time = st
  304. for i in range((et - st) // 86400):
  305. page = 1
  306. date = time.strftime("%Y-%m-%d", time.localtime(start_time))
  307. sign = md5value(apiKEY + date + str(merchant_id) + timestamp + apiSecurity).upper()
  308. order_params = {
  309. 'apiKey': apiKEY,
  310. 'apiSecurity': apiSecurity,
  311. 'timestamp': timestamp,
  312. 'date': date,
  313. 'merchant_id': merchant_id,
  314. 'sign': sign,
  315. 'page': page
  316. }
  317. list1 = requests.post(order_url, order_params).json()
  318. if 'data' in list1.keys() and len(list1['data']) > 0:
  319. for i in range(int(math.ceil(list1['count'] / 500))):
  320. data2 = {
  321. 'apiKey': apiKEY,
  322. 'apiSecurity': apiSecurity,
  323. 'timestamp': timestamp,
  324. 'date': date,
  325. 'merchant_id': merchant_id,
  326. 'sign': sign,
  327. 'page': page
  328. }
  329. list2 = requests.post(order_url, data2).json()
  330. for x in list2['data']:
  331. if x['order_status'] == 1:
  332. y = {}
  333. ##dtime = datetime.datetime.strptime(x['pay_at'],"%Y-%m-%d")
  334. ##y['date']= ((int(time.mktime(dtime.timetuple()))+8*3600)//86400)*86400-8*3600
  335. y['user_id'] = x['openid']
  336. y['order_id'] = x['trans_id']
  337. y['order_time'] = x['pay_at']
  338. y['reg_time'] = x['join_at']
  339. y['date'] = (start_time + 8 * 3600) // 86400 * 86400 - 8 * 3600
  340. y['channel'] = merchant_name
  341. y['channel_id'] = merchant_id
  342. y['platform'] = '花生'
  343. y['stage'] = stage
  344. y['from_novel'] = x['book_name']
  345. y['amount'] = x['amount']
  346. y = sorted(y.items(), key=lambda item: item[0])
  347. y = dict(y)
  348. y = tuple(y.values())
  349. order_list = order_list + ((y),)
  350. page = page + 1
  351. start_time = start_time + 86400
  352. return order_list
  353. ##《4》掌中云
  354. def get_zzy_order(st, et):
  355. start_exec_seconds = date_util.getCurrentSecondTime()
  356. total_order = ()
  357. account_list = al.zzy_account_list
  358. # account_list = [['1108701f1d6','0f9c0f8429d1a16a8a78c2306e7a4db3','清勇7月']]
  359. executor = ProcessPoolExecutor(max_workers=5)
  360. futures = []
  361. for account in account_list:
  362. url = 'https://openapi.818tu.com/partners/channel/channels/list?'
  363. my_key = account[0]
  364. secert = account[1]
  365. my_sign = md5value(secert + 'key=' + my_key)
  366. parameter = 'key=' + my_key + '&sign=' + my_sign
  367. channel_list = requests.get(url + parameter) # 获取子渠道列表
  368. if 'data' in channel_list.json().keys():
  369. items = channel_list.json()['data']['items']
  370. else:
  371. items = []
  372. continue
  373. for item in items:
  374. # 获取channel_id 后逐个拉取历史orders
  375. future = executor.submit(get_zzy_order_task, st, et, account, item)
  376. futures.append(future)
  377. executor.shutdown(True)
  378. for future in futures:
  379. if len(future.result()) > 0:
  380. total_order = future.result() + total_order
  381. print('掌中云订单数量:', len(total_order), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
  382. return total_order
  383. def get_zzy_order_task(st, et, account, item):
  384. # 掌中云的时间格式比较特殊,转换下
  385. st = platform_util.getZzyQueryTime(st)
  386. et = platform_util.getZzyQueryTime(et)
  387. order_list = ()
  388. my_key = account[0]
  389. secert = account[1]
  390. stage = account[2]
  391. channel_id = item['id']
  392. channel = item['nickname']
  393. status = str(1)
  394. per_page = str(1000)
  395. limit_time = et
  396. get_time = st
  397. lt = parse.urlencode({'created_at[lt]': limit_time})
  398. gt = parse.urlencode({'created_at[gt]': get_time})
  399. url_1 = 'https://openapi.818tu.com/partners/channel/orders/list?'
  400. my_sign_1 = md5value(secert + 'channel_id=' + str(
  401. channel_id) + '&created_at[gt]=' + get_time + '&created_at[lt]=' + limit_time + '&key=' + my_key + '&per_page=' + per_page + '&status=' + status)
  402. parameter_1 = 'channel_id=' + str(
  403. channel_id) + '&' + gt + '&' + lt + '&per_page=' + per_page + '&status=' + status + '&key=' + my_key + '&sign=' + my_sign_1
  404. orders = requests.get(url_1 + parameter_1)
  405. t = orders.json()['data']['count'] // int(per_page) + 1
  406. for page in range(1, t + 1):
  407. my_sign_2 = md5value(secert + 'channel_id=' + str(
  408. channel_id) + '&created_at[gt]=' + get_time + '&created_at[lt]=' + limit_time + '&key=' + my_key + '&page=' + str(
  409. page) + '&per_page=' + per_page + '&status=' + status)
  410. parameter_2 = 'channel_id=' + str(channel_id) + '&' + gt + '&' + lt + '&page=' + str(
  411. page) + '&per_page=' + per_page + '&status=' + status + '&key=' + my_key + '&sign=' + my_sign_2
  412. orders_1 = requests.get(url_1 + parameter_2)
  413. b = orders_1.json()['data']['items']
  414. for a in b:
  415. c = {}
  416. c['user_id'] = str(a['member']['openid'])
  417. c['channel'] = channel
  418. c['reg_time'] = a['member']['created_at']
  419. c['channel_id'] = channel_id
  420. c['amount'] = round(a['price'] / 100, 2)
  421. c['order_id'] = str(a['id'])
  422. c['order_time'] = a['created_at']
  423. c['platform'] = '掌中云'
  424. c['stage'] = stage
  425. dtime = datetime.datetime.strptime(a['created_at'][0:10], "%Y-%m-%d")
  426. c['date'] = ((int(time.mktime(dtime.timetuple())) + 8 * 3600) // 86400) * 86400 - 8 * 3600
  427. if str(a['from_novel_id']) != 'None':
  428. c['from_novel'] = a['from_novel']['title']
  429. else:
  430. c['from_novel'] = 'None'
  431. x = sorted(c.items(), key=lambda item: item[0])
  432. x = dict(x)
  433. x = tuple(x.values())
  434. order_list = order_list + ((x),)
  435. return order_list
  436. ##《5》 悠书阁
  437. def get_ysg_order(st, et):
  438. start_exec_seconds = date_util.getCurrentSecondTime()
  439. total_order = ()
  440. account_list = al.ysg_account_list
  441. executor = ProcessPoolExecutor(max_workers=5)
  442. futures = []
  443. for account in account_list:
  444. future = executor.submit(get_ysg_order_task, st, et, account)
  445. futures.append(future)
  446. executor.shutdown(True)
  447. for future in futures:
  448. if len(future.result()) > 0:
  449. total_order = future.result() + total_order
  450. print('悠书阁订单数量:', len(total_order), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
  451. return total_order
  452. def get_ysg_order_task(st, et, account):
  453. order_list = ()
  454. url = 'https://novel.youshuge.com/v2/open/orders'
  455. # 超过100条就需要分页,别问我为什么知道,看代码看出来的
  456. max_page_size = 100
  457. host_name = account[0]
  458. channel_id = account[1]
  459. secert_key = account[2]
  460. channel = account[3]
  461. stage = account[4]
  462. timestamp = int(time.time())
  463. start_date = time.strftime("%Y-%m-%d", time.localtime(st))
  464. end_date = time.strftime("%Y-%m-%d", time.localtime(et))
  465. page = 1
  466. str1 = 'channel_id=' + str(channel_id) + '&end_date=' + end_date + '&host_name=' + host_name + '&page=' + str(
  467. page) + '&pay_status=1' + '&start_date=' + start_date + '&time=' + str(timestamp) + '&key=' + secert_key
  468. sign = md5value(str1).upper()
  469. data = {
  470. 'sign': sign,
  471. 'host_name': host_name,
  472. 'time': timestamp,
  473. 'channel_id': channel_id,
  474. 'page': page,
  475. 'pay_status': 1,
  476. 'start_date': start_date,
  477. 'end_date': end_date
  478. }
  479. respone = requests.post(url, data)
  480. if respone.status_code == 400:
  481. print('respone', respone)
  482. result_json = respone.json()
  483. first_page_order = build_ysg_order_data(channel, channel_id, result_json, stage)
  484. order_list = order_list + first_page_order
  485. if len(first_page_order) == 0:
  486. return order_list
  487. total_count = result_json['data'][0]['count']
  488. if total_count > max_page_size:
  489. for i in range((total_count - 1) // max_page_size + 1):
  490. timestamp = int(time.time())
  491. str1 = 'channel_id=' + str(
  492. channel_id) + '&end_date=' + end_date + '&host_name=' + host_name + '&page=' + str(
  493. page) + '&pay_status=1' + '&start_date=' + start_date + '&time=' + str(timestamp) + '&key=' + secert_key
  494. sign = md5value(str1).upper()
  495. data2 = {
  496. 'sign': sign,
  497. 'host_name': host_name,
  498. 'time': timestamp,
  499. 'channel_id': channel_id,
  500. 'page': page,
  501. 'pay_status': 1,
  502. 'start_date': start_date,
  503. 'end_date': end_date
  504. }
  505. r2 = requests.post(url, data2).json()
  506. order_list = order_list + build_ysg_order_data(channel, channel_id, r2, stage)
  507. page = page + 1
  508. return order_list
  509. def build_ysg_order_data(channel, channel_id, result_json, stage):
  510. order_list = ()
  511. if 'data' in result_json.keys():
  512. data = result_json['data']
  513. if len(data) > 0:
  514. for x in data:
  515. y = {}
  516. dtime = datetime.datetime.strptime(x['create_time'][0:10], "%Y-%m-%d")
  517. y['date'] = ((int(
  518. time.mktime(dtime.timetuple())) + 8 * 3600) // 86400) * 86400 - 8 * 3600
  519. y['order_id'] = x['order_num']
  520. y['amount'] = round(int(x['price']) / 100, 2)
  521. y['order_time'] = x['create_time']
  522. y['channel'] = channel
  523. y['from_novel'] = x['book_name']
  524. y['stage'] = stage
  525. y['user_id'] = x['openid']
  526. y['channel_id'] = channel_id
  527. y['platform'] = '悠书阁'
  528. y['reg_time'] = x['reg_time']
  529. y = sorted(y.items(), key=lambda item: item[0])
  530. y = dict(y)
  531. y = tuple(y.values())
  532. order_list = order_list + ((y),)
  533. return order_list
  534. # 数据导入表采用replace替换主键orderid的方法
  535. def mysql_insert_order(data):
  536. if data is None or len(data) == 0:
  537. print('数据为空,不执行数据库操作!')
  538. else:
  539. sql = 'replace into quchen_text.order (amount,channel,channel_id,date,from_novel,order_id,order_time,platform,reg_time,stage,user_id) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
  540. connect = MySQLConnection('test')
  541. try:
  542. num = connect.batch(sql, data)
  543. # 提交
  544. connect.commit()
  545. print(num, '条订单数据入库成功')
  546. except Exception as e:
  547. print('订单数据入库失败:', e)
  548. finally:
  549. connect.close()
  550. # 获取各平台的订单数量
  551. def mysql_select_platform_order_count(date):
  552. sql = 'SELECT platform, COUNT(1) AS num FROM `order` WHERE date = %s GROUP BY platform'
  553. connect = MySQLConnection('test')
  554. platform_order_count = []
  555. try:
  556. platform_order_count = connect.query(sql, date)
  557. return platform_order_count
  558. except Exception as e:
  559. print('各平台的订单数据查询失败:', e)
  560. finally:
  561. connect.close()
  562. return platform_order_count
  563. def start_all_job():
  564. start_exec_seconds = date_util.getCurrentSecondTime()
  565. platform_order_num_list = mysql_select_platform_order_count(date_util.getYesterdayStartTime())
  566. st_unix = date_util.getYesterdayStartTime()
  567. et_unix = date_util.getTodayStartTime()
  568. print('查询开始时间:', st_unix, date_util.getSecondsToDatetime(st_unix))
  569. print('查询结束时间:', et_unix, date_util.getSecondsToDatetime(et_unix))
  570. # order_list = get_yuewen_order(st_unix, et_unix)
  571. # mysql_insert_order(order_list)
  572. if len(platform_order_num_list) != 0:
  573. print('本地库中没有任何数据,现在全平台补全')
  574. mysql_insert_order(get_zzy_order(st_unix, et_unix))
  575. mysql_insert_order(get_yuewen_order(st_unix, et_unix))
  576. mysql_insert_order(get_huasheng_order(st_unix, et_unix))
  577. mysql_insert_order(get_ysg_order(st_unix, et_unix))
  578. mysql_insert_order(get_zhangdu_order(st_unix, et_unix))
  579. else:
  580. platform_list = ['阅文','悠书阁','掌读','掌中云','花生']
  581. for platform_order_num in platform_order_num_list:
  582. platform = str(platform_order_num['platform'])
  583. num = int(platform_order_num['num'])
  584. platform_list.remove(platform)
  585. if platform == '阅文':
  586. order_list = get_yuewen_order(st_unix, et_unix)
  587. if len(order_list) != num:
  588. print('阅文数据实际订单和已经入库数据差异:', len(order_list) - num)
  589. mysql_insert_order(order_list)
  590. elif platform == '悠书阁':
  591. order_list = get_ysg_order(st_unix, et_unix)
  592. if len(order_list) != num:
  593. print('悠书阁数据实际订单和已经入库数据差异:', len(order_list) - num)
  594. mysql_insert_order(order_list)
  595. elif platform == '掌读':
  596. order_list = get_zhangdu_order(st_unix, et_unix)
  597. if len(order_list) != num:
  598. print('掌读数据实际订单和已经入库数据差异:', len(order_list) - num)
  599. mysql_insert_order(order_list)
  600. elif platform == '掌中云':
  601. order_list = get_zzy_order(st_unix, et_unix)
  602. if len(order_list) != num:
  603. print('掌中云数据实际订单和已经入库数据差异:', len(order_list) - num)
  604. mysql_insert_order(order_list)
  605. elif platform == '花生':
  606. order_list = get_huasheng_order(st_unix, et_unix)
  607. if len(order_list) != num:
  608. print('花生数据实际订单和已经入库数据差异:', len(order_list) - num)
  609. mysql_insert_order(order_list)
  610. else:
  611. print('发现未知平台数据!', platform_order_num)
  612. for platform in platform_list:
  613. if platform == '阅文':
  614. print('阅文没有数据')
  615. mysql_insert_order(get_yuewen_order(st_unix, et_unix))
  616. elif platform == '悠书阁':
  617. print('悠书阁没有数据')
  618. mysql_insert_order(get_yuewen_order(st_unix, et_unix))
  619. elif platform == '掌读':
  620. print('掌读没有数据')
  621. mysql_insert_order(get_zhangdu_order(st_unix, et_unix))
  622. elif platform == '掌中云':
  623. print('掌中云没有数据')
  624. mysql_insert_order(get_zzy_order(st_unix, et_unix))
  625. elif platform == '花生':
  626. print('花生没有数据')
  627. mysql_insert_order(get_huasheng_order(st_unix, et_unix))
  628. else:
  629. print('什么鬼平台:', platform)
  630. print('订单检查执行时间(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
  631. if __name__ == '__main__':
  632. start_all_job()