get_order_wending_new.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. import hashlib
  2. import time
  3. import datetime
  4. from concurrent.futures import ProcessPoolExecutor
  5. import requests
  6. from util import date_util
  7. from util import platform_config_util
  8. from util import robust_util
  9. from util.MySQLConnection import MySQLConnection
  10. def md5(s):
  11. md5 = hashlib.md5()
  12. md5.update(s.encode("utf-8"))
  13. return md5.hexdigest()
  14. def getSelfDateStr(times=time.time(), date_format='%Y%m%d'):
  15. """
  16. ## 20201028添加,阳光接口,文鼎接口,日期参数请求格式20201028,一日一拉api数据
  17. description: 获取指定时间戳
  18. time: 秒 默认当前时间
  19. return: 返回指定时间戳的前一日日期 。 比如 :接收20190512号的时间戳,返回 20190513 -> str
  20. tips: 一天86400秒
  21. """
  22. timestamps = str(time.strftime(date_format, time.localtime(times)))
  23. return timestamps
  24. def get_wending_account_list():
  25. sql = "select text from order_account_text where platform='文鼎'"
  26. con = MySQLConnection()
  27. data = con.query(sql)
  28. li = []
  29. for i in data:
  30. a = i['text'].replace('\n', '').split(',')
  31. li.append(a)
  32. return li
  33. def get_wending_order(st, et, account_list):
  34. total_order_list = ()
  35. start_exec_seconds = date_util.getCurrentSecondTime()
  36. futures = []
  37. for account in account_list:
  38. futures.append(get_wending_order_task(st, et, account))
  39. for future in futures:
  40. if len(future) > 0:
  41. total_order_list = future + total_order_list
  42. print('文鼎订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
  43. return total_order_list
  44. def get_wd_account_siteid_list(account):
  45. url = 'https://bi.reading.163.com/dist-api/siteList'
  46. consumerkey = account[0]
  47. secretkey = account[1]
  48. stage = account[3]
  49. timestamp = int(time.time() * 1000)
  50. siteid_params = {
  51. "consumerkey": consumerkey,
  52. 'secretkey': secretkey,
  53. 'timestamp': timestamp,
  54. }
  55. sorted_data = sorted(siteid_params.items(), reverse=False)
  56. s = ""
  57. for k, v in sorted_data:
  58. s = s + str(k) + "=" + str(v)
  59. sign = md5(s).lower()
  60. siteid_params['sign'] = sign
  61. consumerkey = siteid_params['consumerkey']
  62. timestamp = siteid_params['timestamp']
  63. parameter = 'consumerkey=' + str(consumerkey) + '&timestamp=' + str(timestamp) + '&sign=' + str(sign)
  64. get_url = url + "?" + parameter
  65. while True:
  66. r = requests.get(url=get_url)
  67. if r.status_code == 200:
  68. break
  69. try:
  70. id_key_list = r.json()['data']
  71. except:
  72. return []
  73. mpid_list = []
  74. try:
  75. for id_key_val in id_key_list:
  76. mpid = dict(id_key_val)["mpId"]
  77. mpid_list.append(mpid)
  78. except Exception as e:
  79. print(stage, '站点查询返回结果:', r.json())
  80. return mpid_list
  81. def get_wending_json_object(url, params):
  82. params['timestamp'] = int(time.time() * 1000)
  83. sorted_data = sorted(params.items(), reverse=False)
  84. s = ""
  85. for k, v in sorted_data:
  86. s = s + str(k) + "=" + str(v)
  87. sign = md5(s).lower()
  88. params['sign'] = sign
  89. consumerkey = params['consumerkey']
  90. secretkey = params['secretkey']
  91. timestamp = params['timestamp']
  92. siteid = params['siteid']
  93. pageSize = params['pageSize']
  94. starttime = params['starttime']
  95. endtime = params['endtime']
  96. page = params['page']
  97. paystatus = params['paystatus']
  98. ## +'&secretkey='+str(secretkey)
  99. parameter = 'consumerkey=' + str(consumerkey) + '&timestamp=' + str(timestamp) + '&siteid=' + str(
  100. siteid) + '&pageSize=' + str(pageSize) \
  101. + '&starttime=' + str(starttime) + '&endtime=' + str(endtime) + '&page=' + str(
  102. page) + '&paystatus=' + str(paystatus) + '&sign=' + str(sign)
  103. global get_url
  104. get_url = url + "?" + parameter
  105. while True:
  106. r = requests.get(url=get_url)
  107. if r.status_code == 200:
  108. break
  109. else:
  110. time.sleep(1)
  111. print("请求连接出错,等待1s...")
  112. response_result_json = r.json()
  113. del params['sign']
  114. return response_result_json
  115. def get_wending_order_task(st, et, account):
  116. order_list = ()
  117. url = 'https://bi.reading.163.com/dist-api/rechargeList'
  118. consumerkey = account[0]
  119. secretkey = account[1]
  120. siteid = account[2]
  121. stage = account[3]
  122. siteid_list = get_wd_account_siteid_list(account)
  123. print(siteid_list)
  124. if len(siteid_list) == 0:
  125. siteid_list.append(siteid)
  126. starttime = getSelfDateStr(st, '%Y%m%d%H%M')
  127. endtime = getSelfDateStr(et, '%Y%m%d%H%M')
  128. for siteid in siteid_list:
  129. page = 1
  130. while True:
  131. params = {
  132. 'consumerkey': consumerkey,
  133. 'secretkey': secretkey,
  134. 'timestamp': int(1601481600),
  135. 'siteid': siteid,
  136. 'pageSize': 1000,
  137. 'starttime': starttime,
  138. 'endtime': endtime,
  139. 'page': page,
  140. 'paystatus': 1}
  141. response_result_json = get_wending_json_object(url, params)
  142. order_item_list = response_result_json['data']['rechargeList']
  143. for x in order_item_list:
  144. y = {}
  145. y['platform'] = '文鼎'
  146. y['channel'] = x['wx_mpName'] ## 公众号名称
  147. y['channel_id'] = x['wx_originalId'] ## 公众号id
  148. y['from_novel'] = x['bookTitle'] ## 小说名称
  149. y['user_id'] = x['userId'] ## 付费用户uid
  150. y['stage'] = stage ## 期数
  151. createTime = time.strftime("%Y-%m-%d %H:%M:%S",
  152. time.localtime(x['createTime'] // 1000)) ## 时间戳 》struct_time 》标准时间
  153. y['order_time'] = createTime ## 订单生成时间
  154. y['amount'] = x['money'] / 100 ## 原数据单位:分
  155. uid_reg_time = time.strftime("%Y-%m-%d %H:%M:%S",
  156. time.localtime(x['userRegisterTime'] // 1000)) ## 13位时间戳 》标准时间
  157. y['reg_time'] = uid_reg_time ## 用户注册时间
  158. y['order_id'] = x['ewTradeId'] ## 订单id
  159. y['date'] = createTime
  160. y = sorted(y.items(), key=lambda item: item[0])
  161. y = dict(y)
  162. y = tuple(y.values())
  163. order_list = order_list + ((y),)
  164. if len(order_item_list) < 1000:
  165. break
  166. else:
  167. page += 1
  168. print(f"文鼎数据日期-{starttime}到{endtime}-期数-{stage}-获取数据-{len(order_list)}条")
  169. return order_list
  170. def batch_save_order_new(data):
  171. if data is None or len(data) == 0:
  172. print('数据为空,不执行数据库操作!')
  173. else:
  174. sql = 'INSERT IGNORE INTO quchen_text.ods_order(amount,channel,channel_id,date,from_novel,order_id,order_time,platform,reg_time,stage,user_id) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
  175. connect = MySQLConnection()
  176. try:
  177. num = connect.batch(sql, data)
  178. # 提交
  179. connect.commit()
  180. print('订单数据最终入库【{num}】条'.format(num=num))
  181. except Exception as e:
  182. print('订单数据入库失败:', e)
  183. finally:
  184. connect.close()
  185. def start_order_job_wending(st, et):
  186. account_list = get_wending_account_list()
  187. da = get_wending_order(st, et, account_list)
  188. batch_save_order_new(da)
  189. if __name__ == '__main__':
  190. start_order_job_wending()