get_order_dairly_yangguang.py 11 KB


  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # 20201028
  4. ## 20201027新添加 阳光 平台接口
  5. ########################## 阳光接口返回数据格式
  6. '''
  7. {
  8. "error_code": 0, ## 错误码 0 成功
  9. "error_msg": "", ## 错误信息
  10. "data":{ ## 数据内容
  11. "last_id": 1, ## 最后一条数据的 ID 请求下一页数据时进行传递
  12. "count": 500, ## 此次查询数据量
  13. "push_time": 1570701536, ## 返回时间戳
  14. "list": [ ## 数据列表
  15. {
  16. "merchant_id": "20180627151621_1_UdyF", ## 商户订单号
  17. "transaction_id": "4200000122201806277014647642", ## 交易单号
  18. "type": "1", ## 订单类型,'1'=>书币充值 '2'=>VIP 充值
  19. "money": "50", ## 总额
  20. "state": "1", ## 订单状态,'0'=>未完成 '1' => 完成
  21. "from": "0", ##
  22. "create_time": "1530083789", ## 下单时间
  23. "finish_time": "1530083789", ## 完成时间
  24. "book_name": "一品邪少", ## 书名
  25. "book_tags": "现代都市", ##
  26. "referral_url": "/t/392109", ## 推广链接
  27. "user_id": "112333", ## 用户 id
  28. "channel_id": "1231" ## 渠道 id
  29. user_createtime ## 用户注册时间
  30. openid ## 用户 openid
  31. },
  32. ]
  33. '''
  34. ##########################
  35. import random
  36. import hashlib
  37. import time
  38. from concurrent.futures import ProcessPoolExecutor
  39. import requests
  40. from util import date_util
  41. from util import platform_config_util ## 账号配置
  42. from util import robust_util
  43. from util.MySQLConnection import MySQLConnection
  44. def md5(s):
  45. md5 = hashlib.md5()
  46. md5.update(s.encode("utf-8"))
  47. return md5.hexdigest()
  48. def sha1(s):
  49. sha1 = hashlib.sha1()
  50. sha1.update(s.encode("utf-8"))
  51. return sha1.hexdigest()
  52. def get_random_str(num=5):
  53. H = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
  54. salt = ''
  55. for i in range(num):
  56. salt += random.choice(H)
  57. return salt
  58. def getSelfDateStr(times=time.time(),date_format='%Y%m%d'):
  59. """
  60. ## 20201028添加,阳光接口,日期参数请求格式20201028,一日一拉api数据
  61. description: 获取指定时间戳
  62. time: 秒 默认当前时间
  63. return: 返回指定时间戳的前一日日期 。 比如 :接收20190512号的时间戳,返回 20190513 -> str
  64. tips: 一天86400秒
  65. """
  66. timestamps = str(time.strftime(date_format,time.localtime(times)))
  67. return timestamps
  68. def get_yangguang_account_list():
  69. """
  70. des cription: 阳光账号列表
  71. return: [['host_name', 'channel_id', 'secert_key', 'channel', 'stage']] ->list
  72. """
  73. return platform_config_util.get_account_list('阳光', 'yangguang_account_config.csv')
  74. @robust_util.catch_exception
  75. def get_yangguang_order(st,et,account_list):
  76. total_order_list = ()
  77. start_exec_seconds = date_util.getCurrentSecondTime()
  78. #account_list = get_yangguang_account_list()
  79. executor = ProcessPoolExecutor(max_workers=5)
  80. futures = []
  81. for account in account_list:
  82. future = executor.submit(get_yangguang_order_task, st, et, account)
  83. futures.append(future)
  84. executor.shutdown(True)
  85. for future in futures:
  86. order_list = future.result()
  87. if len(order_list) > 0:
  88. total_order_list = order_list + total_order_list
  89. print('阳光订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
  90. return total_order_list
  91. def get_yangguang_order_task(st,et,account):
  92. order_list = ()
  93. url = 'https://api.yifengaf.cn:443/api/channeldataapi/orders'
  94. url_frequency = 0
  95. ##接口鉴权参数
  96. client_id = account[0]
  97. token = account[1]
  98. ##订单接口参数必填
  99. channel_id = account[3]
  100. ##需要保存的参数
  101. stage = account[2]
  102. channel = account[4]
  103. order_st_date = date_util.getSelfDateStr(int(st-86400),date_format='%Y-%m-%d')
  104. for i in range((et-st)//86400 + 1):
  105. last_id = 111
  106. statis_unix_time = st + (i-1)*86400
  107. search_date = getSelfDateStr(statis_unix_time,"%Y%m%d")
  108. while True:
  109. if st == et:
  110. break
  111. if url_frequency % 3 == 0 :
  112. time.sleep(61)
  113. url_frequency += 1
  114. nonce = get_random_str()
  115. timestamp = int(time.time())
  116. signaure = str(token)+str(timestamp)+str(client_id)+str(nonce)
  117. signaure = sha1(signaure)
  118. params = {
  119. ## 授权url参数
  120. 'client_id': client_id,
  121. 'token': token,
  122. 'nonce': nonce,
  123. 'timestamp': timestamp,
  124. 'signaure': signaure,
  125. ## 订单url参数,那个渠道,那天的数据
  126. 'channel_id': channel_id,
  127. 'search_date': search_date,
  128. }
  129. response_result_json = requests.get(url=url, params=params).json()
  130. code = response_result_json['error_code']
  131. if code != 0:
  132. # error_msg = response_result_json['error_msg']
  133. # print("阳光查询充值接口错误信息:",error_msg)
  134. print("阳光异常vip公众号:",channel,"所在期数:",stage)
  135. print('阳光查询充值接口异常:',response_result_json,'传入参数', params)
  136. break
  137. result_data = response_result_json['data']
  138. '''
  139. # json['data']返回数据案例 {'last_id': 108847045, 'count': 4, 'push_time': 1604303106,
  140. # 'list': [{'user_id': '531307203', 'merchant_id': '20201101111026_531307203_lYzh', 'transaction_id': '', 'type': '1', 'money': '18.00',
  141. # 'state': '0', 'from': '0', 'create_time': '1604200226', 'finish_time': '', 'referral_id': '3515889', 'referral_id_permanent': '3515894',
  142. # 'channel_id': '26885', 'book_name': '猛虎出山', 'book_tags': '都市', 'referral_url': '/t/3515889', 'subscribe_time': '1603687515',
  143. # 'user_createtime': '1603687515', 'openid': 'om90f6EKBtwcNo9gcgISKsGNyk5o', 'register_ip': '', 'nickname': '爱拼才会赢(星辉电镀化工)', 'avatar': url连接 } ]}
  144. '''
  145. page_count = result_data['count']
  146. if page_count == 0:
  147. break
  148. if last_id == result_data['last_id']:
  149. break
  150. last_id = result_data['last_id']
  151. params['last_id'] = result_data['last_id']
  152. order_item_list = result_data['list']
  153. for order_item in order_item_list:
  154. if int(order_item['state']) != 1:
  155. continue
  156. order = {}
  157. order['amount'] = order_item['money']
  158. order['channel_id'] = order_item['channel_id']
  159. order['order_id'] = str(order_item['transaction_id'])
  160. order['order_time'] = date_util.getSecondsToDatetime(int(order_item['create_time']),"%Y-%m-%d %H:%M:%S") ## 原数据时间戳,转日期默认=%Y-%m-%d %H:%M:%S
  161. order['user_id'] = order_item['user_id']
  162. order['platform'] = '阳光'
  163. order['channel'] = channel
  164. order['reg_time'] = date_util.getSecondsToDatetime(int(order_item['user_createtime']),"%Y-%m-%d %H:%M:%S") ## 原数据时间戳,转日期默认=%Y-%m-%d %H:%M:%S
  165. order['from_novel'] = order_item['book_name'] ## str
  166. order['stage'] = stage ## str
  167. order['date'] = (int(order_item['create_time']) + 8 * 3600) // 86400 * 86400 - 8 * 3600
  168. x = sorted(order.items(), key=lambda item: item[0])
  169. x = dict(x)
  170. x = tuple(x.values())
  171. order_list = order_list + ((x),)
  172. print(f"数据日期-{order_st_date}到{search_date}-公众号-{channel}-获取数据-{len(order_list)}条,例如: {order_list[0:1]}")
  173. return order_list
  174. def batch_save_order(data):
  175. if data is None or len(data) == 0:
  176. print('数据为空,不执行数据库操作!')
  177. else:
  178. sql = 'INSERT IGNORE INTO quchen_text.order(amount,channel,channel_id,date,from_novel,order_id,order_time,platform,reg_time,stage,user_id) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
  179. connect = MySQLConnection()
  180. try:
  181. num = connect.batch(sql, data)
  182. # 提交
  183. connect.commit()
  184. print('订单数据最终入库【{num}】条'.format(num=num))
  185. except Exception as e:
  186. print('订单数据入库失败:', e)
  187. finally:
  188. connect.close()
  189. def batch_save_order_new(data):
  190. if data is None or len(data) == 0:
  191. print('数据为空,不执行数据库操作!')
  192. else:
  193. sql = 'INSERT IGNORE INTO quchen_text.ods_order(amount,channel,channel_id,date,from_novel,order_id,order_time,platform,reg_time,stage,user_id) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
  194. connect = MySQLConnection()
  195. try:
  196. num = connect.batch(sql, data)
  197. # 提交
  198. connect.commit()
  199. print('订单数据最终入库【{num}】条'.format(num=num))
  200. except Exception as e:
  201. print('订单数据入库失败:', e)
  202. finally:
  203. connect.close()
  204. def start_order_job():
  205. start_exec_seconds = date_util.getCurrentSecondTime()
  206. st_unix, et_unix = date_util.getPreviousHourAndCurrentHourSecondTime(start_exec_seconds)
  207. account_list = get_yangguang_account_list()
  208. # st_unix = 1602313200 # 2020/10/10 15:0:0
  209. # et_unix = 1602316800 # 2020/10/10 16:0:0
  210. #print('查询开始时间:', st_unix, date_util.getSecondsToDatetime(st_unix))
  211. #print('查询结束时间:', et_unix, date_util.getSecondsToDatetime(et_unix))
  212. ## 20201028添加阳光平台
  213. batch_save_order(get_yangguang_order(st_unix, et_unix, account_list))
  214. batch_save_order_new(get_yangguang_order(st_unix, et_unix, account_list))
  215. print('订单同步执行时间(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
  216. if __name__ == '__main__':
  217. print(date_util.getCurrentFormatTimeStr())
  218. start_order_job()