get_order_dairly_wending.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # 20201104
  4. ## 20201106新添加 文鼎 平台接口
  5. ########################## 文鼎订单接口,返回数据格式
  6. '''
  7. {
  8. "code": 200,
  9. "data": {
  10. "totalPage": 1,
  11. "rechargeList": [
  12. {
  13. "userId": 1067,
  14. "nickName": "海洁测试!",
  15. "userFollowTime": 1512717067449,
  16. "wx_originalId": "gh_3f1fc031329d",
  17. "wx_mpName": "测试公众号",
  18. "wx_user_openId": "odjo61rGnt6Sgl6CeWhPZTfve7eA",
  19. "rechargeUuid": "1fd47b51-1256-43ce-a72d-3266e23235ba",
  20. "rechargeMethod": 1,
  21. "money": 100,
  22. "createTime": 1512717069449,
  23. "payStatus": 0
  24. },
  25. ]
  26. '''
  27. ##########################
  28. import hashlib
  29. import time
  30. import datetime
  31. from concurrent.futures import ProcessPoolExecutor
  32. import requests
  33. from util import date_util
  34. from util import platform_config_util ## 账号配置
  35. from util import robust_util
  36. from apscheduler.schedulers.blocking import BlockingScheduler
  37. from util.MySQLConnection import MySQLConnection
  38. #import random
  39. #import math
  40. #from urllib import parse
  41. def md5(s):
  42. md5 = hashlib.md5()
  43. md5.update(s.encode("utf-8"))
  44. return md5.hexdigest()
  45. def getSelfDateStr(times=time.time(),date_format='%Y%m%d'):
  46. """
  47. ## 20201028添加,阳光接口,文鼎接口,日期参数请求格式20201028,一日一拉api数据
  48. description: 获取指定时间戳
  49. time: 秒 默认当前时间
  50. return: 返回指定时间戳的前一日日期 。 比如 :接收20190512号的时间戳,返回 20190513 -> str
  51. tips: 一天86400秒
  52. """
  53. timestamps = str(time.strftime(date_format,time.localtime(times)))
  54. return timestamps
  55. def get_wending_account_list():
  56. """
  57. des cription: 文鼎账号列表
  58. return: [['consumerkey', 'secretkey', 'siteid', 'stage', 'account']] ->list
  59. """
  60. return platform_config_util.get_account_list('文鼎', 'wending_account_config.csv')
  61. ## 5线程并发
  62. @robust_util.catch_exception
  63. def get_wending_order(st,et):
  64. total_order_list = ()
  65. start_exec_seconds = date_util.getCurrentSecondTime()
  66. account_list = get_wending_account_list()
  67. executor = ProcessPoolExecutor(max_workers=5)
  68. futures = []
  69. for account in account_list:
  70. future = executor.submit(get_wending_order_task, st, et, account)
  71. futures.append(future)
  72. executor.shutdown(True)
  73. for future in futures:
  74. order_list = future.result()
  75. if len(order_list) > 0:
  76. total_order_list = order_list + total_order_list
  77. print('文鼎订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
  78. return total_order_list
  79. ## 获取json对象
  80. def get_wending_json_object(url,params):
  81. params['timestamp'] = int(time.time()*1000)
  82. sorted_data = sorted(params.items(),reverse = False)
  83. s=""
  84. for k,v in sorted_data:
  85. s = s+str(k)+"="+str(v)
  86. sign = md5(s).lower()
  87. params['sign'] = sign
  88. consumerkey = params['consumerkey']
  89. secretkey = params['secretkey']
  90. timestamp = params['timestamp']
  91. siteid = params['siteid']
  92. pageSize = params['pageSize']
  93. starttime = params['starttime']
  94. endtime = params['endtime']
  95. page = params['page']
  96. paystatus = params['paystatus']
  97. ## +'&secretkey='+str(secretkey)
  98. parameter = 'consumerkey='+str(consumerkey)+'&timestamp='+str(timestamp)+'&siteid='+str(siteid)+'&pageSize='+str(pageSize)\
  99. +'&starttime='+str(starttime)+'&endtime='+str(endtime)+'&page='+str(page)+'&paystatus='+str(paystatus)+'&sign='+str(sign)
  100. global get_url
  101. get_url = url+"?"+parameter
  102. response_result_json = requests.get(url=get_url).json()
  103. #response_result_json = requests.get(url=url, params=params).json()
  104. del params['sign']
  105. return response_result_json
  106. ## 具体文鼎任务task
  107. def get_wending_order_task(st,et,account):
  108. order_list = ()
  109. url = 'https://bi.reading.163.com/dist-api/rechargeList'
  110. ## 接口鉴权参数
  111. consumerkey = account[0]
  112. secretkey = account[1]
  113. ## 订单参数
  114. siteid = account[2]
  115. pageSize = 1000
  116. page = 1
  117. paystatus = 1
  118. ## 期数自定义保存到记录中
  119. stage = account[3]
  120. while True:
  121. if st >= et:
  122. break
  123. starttime = getSelfDateStr(st,'%Y%m%d%H%M')
  124. endtime = getSelfDateStr(et,'%Y%m%d%H%M')
  125. params = {
  126. 'consumerkey': consumerkey,
  127. 'secretkey':secretkey,
  128. 'timestamp':int(1601481600),
  129. 'siteid':siteid,
  130. 'pageSize':pageSize,
  131. 'starttime':starttime,
  132. 'endtime':endtime,
  133. 'page':page,
  134. 'paystatus':paystatus
  135. }
  136. response_result_json = get_wending_json_object(url,params)
  137. code = response_result_json['code']
  138. if code != 200:
  139. print('文鼎查询充值接口异常:',response_result_json,'传入参数', params,"请求url",get_url)
  140. break
  141. totalPag = response_result_json['data']['totalPage']
  142. if totalPag <= 0:
  143. break
  144. for page in range(1,totalPag+1):
  145. params['page'] = page
  146. response_result_json = get_wending_json_object(url,params)
  147. order_item_list = response_result_json['data']['rechargeList']
  148. '''
  149. print(order_item_list)
  150. [{
  151. 'userId': 48267585, 'userRegisterTime': 1568671618894, 'nickName': '\ue04a邓泽群\ue04a爱花园', 'ip': '124.13.64.179'
  152. , 'userAgent': 'Mozilla/5.0 (Linux; Android 9; /arm64'
  153. , 'userFollowTime': 1568671618894, 'wx_originalId': 'gh_0beeff4c0d70', 'wx_mpName': '美语阅读', 'wx_user_openId': 'oWL046E86PdO9wX34naL6IIwfaVc'
  154. , 'rechargeUuid': '38fc210f-8e7d-4ae5-a395-d33c2a80e234', 'sourceUuid': 'ts_b4651ee782e94cce8fc6def301de1367_4', 'bookTitle': '绝代医相'
  155. , 'ewTradeId': '4200000687202010012870758048', 'payTime': 1601565287000, 'rechargeMethod': 1, 'money': 2900, 'createTime': 1601565278043
  156. , 'updateTime': 1601565288057, 'payStatus': 1
  157. }]
  158. '''
  159. for x in order_item_list:
  160. y={}
  161. y['date'] = (int(x['payTime']//1000)+ 8 * 3600) // 86400 * 86400 - 8 * 3600 ## 网易的是13位时间戳
  162. y['platform'] = '文鼎'
  163. y['channel'] = x['wx_mpName']
  164. y['channel_id'] = x['wx_originalId']
  165. y['from_novel'] = x['bookTitle']
  166. y['user_id'] = x['userId']
  167. y['stage'] = stage
  168. createTime = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(x['createTime']//1000))
  169. y['order_time']= createTime
  170. y['amount']=x['money']/100 ## 原数据单位:分
  171. uid_reg_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(x['userRegisterTime']//1000))
  172. y['reg_time']= uid_reg_time
  173. y['order_id']= x['rechargeUuid']
  174. y = sorted(y.items(), key=lambda item:item[0])
  175. y = dict(y)
  176. y = tuple(y.values())
  177. order_list = order_list+((y),)
  178. if totalPag==params['page']:
  179. break
  180. print(f"文鼎数据日期-{starttime}到{endtime}-期数-{stage}-获取数据-{len(order_list)}条,例如》{order_list[0:1]}")
  181. return order_list
  182. def batch_save_order(data):
  183. if data is None or len(data) == 0:
  184. print('数据为空,不执行数据库操作!')
  185. else:
  186. sql = 'INSERT IGNORE INTO quchen_text.order_zwg(amount,channel,channel_id,date,from_novel,order_id,order_time,platform,reg_time,stage,user_id) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
  187. connect = MySQLConnection()
  188. try:
  189. num = connect.batch(sql, data)
  190. # 提交
  191. connect.commit()
  192. print('订单数据最终入库【{num}】条'.format(num=num))
  193. except Exception as e:
  194. print('订单数据入库失败:', e)
  195. finally:
  196. connect.close()
  197. def start_order_job():
  198. start_exec_seconds = date_util.getCurrentSecondTime()
  199. #st_unix, et_unix = date_util.getPreviousHourAndCurrentHourSecondTime(start_exec_seconds)
  200. # st_unix = 1602313200 # 2020/10/10 15:0:0
  201. # et_unix = 1602316800 # 2020/10/10 16:0:0
  202. #print('查询开始时间:', st_unix, date_util.getSecondsToDatetime(st_unix))
  203. #print('查询结束时间:', et_unix, date_util.getSecondsToDatetime(et_unix))
  204. ## 20201028添加阳光平台
  205. st_unix = date_util.getYesterdayStartTime()
  206. et_unix = date_util.getTodayStartTime()
  207. batch_save_order(get_wending_order(st_unix, et_unix))
  208. print('订单同步执行时间(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
  209. start_order_job()
  210. '''
  211. start_job_time = '2020-11-06 10:04:00'
  212. if __name__ == '__main__':
  213. scheduler = BlockingScheduler()
  214. scheduler.add_job(start_order_job, 'interval', days =1 ,start_date=start_job_time)
  215. #scheduler.add_job(start_order_job, 'interval',days =1,hours = 2,minutes = 0,seconds = 0)
  216. #线上是24h执行一次
  217. scheduler.start()
  218. '''
  219. '''
  220. account_list = [
  221. ['68442881','RFygHhX16LEYYe8i','1014108','趣程20期','qucheng20qi@163.com'],
  222. ['77257999','86nPtJdYLe1k81gE','1021116','趣程21期','qucheng21qi@163.com']
  223. ]
  224. for account in account_list:
  225. get_wending_order_task(st=int(1601481600),et=int(1601568000),account=account)
  226. '''