get_order_dairly_wending.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. import hashlib
  2. import time
  3. import datetime
  4. from concurrent.futures import ProcessPoolExecutor
  5. import requests
  6. from util import date_util
  7. from util import platform_config_util
  8. from util import robust_util
  9. from util.MySQLConnection import MySQLConnection
  10. def md5(s):
  11. md5 = hashlib.md5()
  12. md5.update(s.encode("utf-8"))
  13. return md5.hexdigest()
  14. def getSelfDateStr(times=time.time(),date_format='%Y%m%d'):
  15. """
  16. ## 20201028添加,阳光接口,文鼎接口,日期参数请求格式20201028,一日一拉api数据
  17. description: 获取指定时间戳
  18. time: 秒 默认当前时间
  19. return: 返回指定时间戳的前一日日期 。 比如 :接收20190512号的时间戳,返回 20190513 -> str
  20. tips: 一天86400秒
  21. """
  22. timestamps = str(time.strftime(date_format,time.localtime(times)))
  23. return timestamps
  24. def get_wending_account_list():
  25. """
  26. des cription: 文鼎账号列表
  27. return: [['consumerkey', 'secretkey', 'siteid', 'stage', 'account']] ->list
  28. """
  29. return platform_config_util.get_account_list('文鼎', 'wending_account_config.csv')
  30. def get_wending_order(st,et,account_list):
  31. total_order_list = ()
  32. start_exec_seconds = date_util.getCurrentSecondTime()
  33. futures = []
  34. for account in account_list:
  35. futures.append(get_wending_order_task(st, et, account))
  36. for future in futures:
  37. if len(future)>0:
  38. total_order_list = future + total_order_list
  39. print('文鼎订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
  40. print(total_order_list)
  41. return total_order_list
  42. def get_wd_account_siteid_list(account):
  43. url = 'https://bi.reading.163.com/dist-api/siteList'
  44. consumerkey = account[0]
  45. secretkey = account[1]
  46. stage = account[3]
  47. timestamp = int(time.time()*1000)
  48. siteid_params = {
  49. "consumerkey":consumerkey,
  50. 'secretkey':secretkey,
  51. 'timestamp':timestamp,
  52. }
  53. sorted_data = sorted(siteid_params.items(),reverse = False)
  54. s=""
  55. for k,v in sorted_data:
  56. s = s+str(k)+"="+str(v)
  57. sign = md5(s).lower()
  58. siteid_params['sign'] = sign
  59. consumerkey = siteid_params['consumerkey']
  60. timestamp = siteid_params['timestamp']
  61. parameter = 'consumerkey='+str(consumerkey)+'&timestamp='+str(timestamp)+'&sign='+str(sign)
  62. get_url = url+"?"+parameter
  63. while True:
  64. r = requests.get(url=get_url)
  65. if r.status_code==200:
  66. break
  67. try:
  68. id_key_list = r.json()['data']
  69. except:
  70. return []
  71. mpid_list = []
  72. try:
  73. for id_key_val in id_key_list:
  74. mpid = dict(id_key_val)["mpId"]
  75. mpid_list.append(mpid)
  76. except Exception as e:
  77. print(stage,'站点查询返回结果:',r.json())
  78. return mpid_list
  79. def get_wending_json_object(url,params):
  80. params['timestamp'] = int(time.time()*1000)
  81. sorted_data = sorted(params.items(),reverse = False)
  82. s=""
  83. for k,v in sorted_data:
  84. s = s+str(k)+"="+str(v)
  85. sign = md5(s).lower()
  86. params['sign'] = sign
  87. consumerkey = params['consumerkey']
  88. secretkey = params['secretkey']
  89. timestamp = params['timestamp']
  90. siteid = params['siteid']
  91. pageSize = params['pageSize']
  92. starttime = params['starttime']
  93. endtime = params['endtime']
  94. page = params['page']
  95. paystatus = params['paystatus']
  96. ## +'&secretkey='+str(secretkey)
  97. parameter = 'consumerkey='+str(consumerkey)+'&timestamp='+str(timestamp)+'&siteid='+str(siteid)+'&pageSize='+str(pageSize)\
  98. +'&starttime='+str(starttime)+'&endtime='+str(endtime)+'&page='+str(page)+'&paystatus='+str(paystatus)+'&sign='+str(sign)
  99. global get_url
  100. get_url = url+"?"+parameter
  101. while True:
  102. r= requests.get(url=get_url)
  103. if r.status_code==200:
  104. break
  105. else:
  106. time.sleep(1)
  107. print("请求连接出错,等待1s...")
  108. response_result_json=r.json()
  109. del params['sign']
  110. return response_result_json
  111. def get_wending_order_task(st,et,account):
  112. order_list = ()
  113. url = 'https://bi.reading.163.com/dist-api/rechargeList'
  114. consumerkey = account[0][1:]
  115. print(consumerkey)
  116. secretkey = account[1]
  117. siteid=account[2]
  118. stage = account[3]
  119. siteid_list = get_wd_account_siteid_list(account)
  120. print(siteid_list)
  121. if len(siteid_list) == 0:
  122. siteid_list.append(siteid)
  123. starttime = getSelfDateStr(st,'%Y%m%d%H%M')
  124. endtime = getSelfDateStr(et,'%Y%m%d%H%M')
  125. for siteid in siteid_list:
  126. page = 1
  127. while True:
  128. params = {
  129. 'consumerkey': consumerkey,
  130. 'secretkey':secretkey,
  131. 'timestamp':int(1601481600),
  132. 'siteid':siteid,
  133. 'pageSize':1000,
  134. 'starttime':starttime,
  135. 'endtime':endtime,
  136. 'page':page,
  137. 'paystatus':1}
  138. response_result_json = get_wending_json_object(url,params)
  139. order_item_list = response_result_json['data']['rechargeList']
  140. for x in order_item_list:
  141. y={}
  142. y['date'] = (int(x['payTime']//1000)+ 8 * 3600) // 86400 * 86400 - 8 * 3600 ## 网易的是13位时间戳
  143. y['platform'] = '文鼎'
  144. y['channel'] = x['wx_mpName'] ## 公众号名称
  145. y['channel_id'] = x['wx_originalId'] ## 公众号id
  146. y['from_novel'] = x['bookTitle'] ## 小说名称
  147. y['user_id'] = x['userId'] ## 付费用户uid
  148. y['stage'] = stage ## 期数
  149. createTime = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(x['createTime']//1000)) ## 时间戳 》struct_time 》标准时间
  150. y['order_time']= createTime ## 订单生成时间
  151. y['amount']=x['money']/100 ## 原数据单位:分
  152. uid_reg_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(x['userRegisterTime']//1000)) ## 13位时间戳 》标准时间
  153. y['reg_time']= uid_reg_time ## 用户注册时间
  154. y['order_id']= x['ewTradeId'] ## 订单id
  155. y = sorted(y.items(), key=lambda item:item[0])
  156. y = dict(y)
  157. y = tuple(y.values())
  158. order_list = order_list+((y),)
  159. if len(order_item_list)<1000:
  160. break
  161. else:
  162. page+=1
  163. print(f"文鼎数据日期-{starttime}到{endtime}-期数-{stage}-获取数据-{len(order_list)}条")
  164. return order_list
  165. def batch_save_order(data):
  166. if data is None or len(data) == 0:
  167. print('数据为空,不执行数据库操作!')
  168. else:
  169. sql = 'INSERT IGNORE INTO quchen_text.order(amount,channel,channel_id,date,from_novel,order_id,order_time,platform,reg_time,stage,user_id) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
  170. connect = MySQLConnection()
  171. try:
  172. num = connect.batch(sql, data)
  173. # 提交
  174. connect.commit()
  175. print('订单数据最终入库【{num}】条'.format(num=num))
  176. except Exception as e:
  177. print('订单数据入库失败:', e)
  178. finally:
  179. connect.close()
  180. def batch_save_order_new(data):
  181. if data is None or len(data) == 0:
  182. print('数据为空,不执行数据库操作!')
  183. else:
  184. sql = 'INSERT IGNORE INTO quchen_text.ods_order(amount,channel,channel_id,date,from_novel,order_id,order_time,platform,reg_time,stage,user_id) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
  185. connect = MySQLConnection()
  186. try:
  187. num = connect.batch(sql, data)
  188. # 提交
  189. connect.commit()
  190. print('订单数据最终入库【{num}】条'.format(num=num))
  191. except Exception as e:
  192. print('订单数据入库失败:', e)
  193. finally:
  194. connect.close()
  195. def start_order_job():
  196. st_unix = date_util.get_n_day(n=-1, is_timestamp=1)
  197. et_unix = date_util.get_n_day(is_timestamp=1)
  198. account_list = get_wending_account_list()
  199. print(account_list)
  200. batch_save_order(get_wending_order(st_unix, et_unix,account_list))
  201. batch_save_order_new(get_wending_order(st_unix, et_unix,account_list))
  202. if __name__ == '__main__':
  203. start_order_job()