get_order_dairly_yangguang.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # 20201028
  4. ## 20201027新添加 阳光 平台接口
  5. ########################## 阳光接口返回数据格式
  6. '''
  7. {
  8. "error_code": 0, ## 错误码 0 成功
  9. "error_msg": "", ## 错误信息
  10. "data":{ ## 数据内容
  11. "last_id": 1, ## 最后一条数据的 ID 请求下一页数据时进行传递
  12. "count": 500, ## 此次查询数据量
  13. "push_time": 1570701536, ## 返回时间戳
  14. "list": [ ## 数据列表
  15. {
  16. "merchant_id": "20180627151621_1_UdyF", ## 商户订单号
  17. "transaction_id": "4200000122201806277014647642", ## 交易单号
  18. "type": "1", ## 订单类型,'1'=>书币充值 '2'=>VIP 充值
  19. "money": "50", ## 总额
  20. "state": "1", ## 订单状态,'0'=>未完成 '1' => 完成
  21. "from": "0", ##
  22. "create_time": "1530083789", ## 下单时间
  23. "finish_time": "1530083789", ## 完成时间
  24. "book_name": "一品邪少", ## 书名
  25. "book_tags": "现代都市", ##
  26. "referral_url": "/t/392109", ## 推广链接
  27. "user_id": "112333", ## 用户 id
  28. "channel_id": "1231" ## 渠道 id
  29. user_createtime ## 用户注册时间
  30. openid ## 用户 openid
  31. },
  32. ]
  33. '''
  34. ##########################
  35. import random
  36. import datetime
  37. import hashlib
  38. import math
  39. import time
  40. from concurrent.futures import ProcessPoolExecutor
  41. from urllib import parse
  42. import requests
  43. from util import date_util
  44. from util import platform_config_util ## 账号配置
  45. from util import robust_util
  46. from apscheduler.schedulers.blocking import BlockingScheduler
  47. from util.MySQLConnection import MySQLConnection
  48. def md5(s):
  49. md5 = hashlib.md5()
  50. md5.update(s.encode("utf-8"))
  51. return md5.hexdigest()
  52. def sha1(s):
  53. sha1 = hashlib.sha1()
  54. sha1.update(s.encode("utf-8"))
  55. return sha1.hexdigest()
  56. def get_random_str(num=5):
  57. H = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
  58. salt = ''
  59. for i in range(num):
  60. salt += random.choice(H)
  61. return salt
  62. def getSelfDateStr(times=time.time(),date_format='%Y%m%d'):
  63. """
  64. ## 20201028添加,阳光接口,日期参数请求格式20201028,一日一拉api数据
  65. description: 获取指定时间戳
  66. time: 秒 默认当前时间
  67. return: 返回指定时间戳的前一日日期 。 比如 :接收20190512号的时间戳,返回 20190513 -> str
  68. tips: 一天86400秒
  69. """
  70. timestamps = str(time.strftime(date_format,time.localtime(times)))
  71. return timestamps
  72. def get_yangguang_account_list():
  73. """
  74. des cription: 阳光账号列表
  75. return: [['host_name', 'channel_id', 'secert_key', 'channel', 'stage']] ->list
  76. """
  77. return platform_config_util.get_account_list('阳光', 'yangguang_account_config.csv')
  78. @robust_util.catch_exception
  79. def get_yangguang_order(st,et,account_list):
  80. total_order_list = ()
  81. start_exec_seconds = date_util.getCurrentSecondTime()
  82. #account_list = get_yangguang_account_list()
  83. executor = ProcessPoolExecutor(max_workers=5)
  84. futures = []
  85. for account in account_list:
  86. future = executor.submit(get_yangguang_order_task, st, et, account)
  87. futures.append(future)
  88. executor.shutdown(True)
  89. for future in futures:
  90. order_list = future.result()
  91. if len(order_list) > 0:
  92. total_order_list = order_list + total_order_list
  93. print('阳光订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
  94. return total_order_list
  95. def get_yangguang_order_task(st,et,account):
  96. order_list = ()
  97. url = 'https://api.yifengaf.cn:443/api/channeldataapi/orders'
  98. url_frequency = 0
  99. ##接口鉴权参数
  100. client_id = account[0]
  101. token = account[1]
  102. ##订单接口参数必填
  103. channel_id = account[3]
  104. ##需要保存的参数
  105. stage = account[2]
  106. channel = account[4]
  107. order_st_date = date_util.getSelfDateStr(int(st-86400),date_format='%Y-%m-%d')
  108. for i in range((et-st)//86400 + 1):
  109. last_id = 111
  110. statis_unix_time = st + (i-1)*86400
  111. search_date = getSelfDateStr(statis_unix_time,"%Y%m%d")
  112. while True:
  113. if st == et:
  114. break
  115. if url_frequency % 3 == 0 :
  116. time.sleep(61)
  117. url_frequency += 1
  118. nonce = get_random_str()
  119. timestamp = int(time.time())
  120. signaure = str(token)+str(timestamp)+str(client_id)+str(nonce)
  121. signaure = sha1(signaure)
  122. params = {
  123. ## 授权url参数
  124. 'client_id': client_id,
  125. 'token': token,
  126. 'nonce': nonce,
  127. 'timestamp': timestamp,
  128. 'signaure': signaure,
  129. ## 订单url参数,那个渠道,那天的数据
  130. 'channel_id': channel_id,
  131. 'search_date': search_date,
  132. }
  133. response_result_json = requests.get(url=url, params=params).json()
  134. code = response_result_json['error_code']
  135. if code != 0:
  136. # error_msg = response_result_json['error_msg']
  137. # print("阳光查询充值接口错误信息:",error_msg)
  138. print("阳光异常vip公众号:",channel,"所在期数:",stage)
  139. print('阳光查询充值接口异常:',response_result_json,'传入参数', params)
  140. break
  141. result_data = response_result_json['data']
  142. '''
  143. # json['data']返回数据案例 {'last_id': 108847045, 'count': 4, 'push_time': 1604303106,
  144. # 'list': [{'user_id': '531307203', 'merchant_id': '20201101111026_531307203_lYzh', 'transaction_id': '', 'type': '1', 'money': '18.00',
  145. # 'state': '0', 'from': '0', 'create_time': '1604200226', 'finish_time': '', 'referral_id': '3515889', 'referral_id_permanent': '3515894',
  146. # 'channel_id': '26885', 'book_name': '猛虎出山', 'book_tags': '都市', 'referral_url': '/t/3515889', 'subscribe_time': '1603687515',
  147. # 'user_createtime': '1603687515', 'openid': 'om90f6EKBtwcNo9gcgISKsGNyk5o', 'register_ip': '', 'nickname': '爱拼才会赢(星辉电镀化工)', 'avatar': url连接 } ]}
  148. '''
  149. page_count = result_data['count']
  150. if page_count == 0:
  151. break
  152. if last_id == result_data['last_id']:
  153. break
  154. last_id = result_data['last_id']
  155. params['last_id'] = result_data['last_id']
  156. order_item_list = result_data['list']
  157. for order_item in order_item_list:
  158. if int(order_item['state']) != 1:
  159. continue
  160. order = {}
  161. order['amount'] = order_item['money']
  162. order['channel_id'] = order_item['channel_id']
  163. order['order_id'] = str(order_item['transaction_id'])
  164. order['order_time'] = date_util.getSecondsToDatetime(int(order_item['create_time']),"%Y-%m-%d %H:%M:%S") ## 原数据时间戳,转日期默认=%Y-%m-%d %H:%M:%S
  165. order['user_id'] = order_item['user_id']
  166. order['platform'] = '阳光'
  167. order['channel'] = channel
  168. order['reg_time'] = date_util.getSecondsToDatetime(int(order_item['user_createtime']),"%Y-%m-%d %H:%M:%S") ## 原数据时间戳,转日期默认=%Y-%m-%d %H:%M:%S
  169. order['from_novel'] = order_item['book_name'] ## str
  170. order['stage'] = stage ## str
  171. order['date'] = (int(order_item['create_time']) + 8 * 3600) // 86400 * 86400 - 8 * 3600
  172. x = sorted(order.items(), key=lambda item: item[0])
  173. x = dict(x)
  174. x = tuple(x.values())
  175. order_list = order_list + ((x),)
  176. print(f"数据日期-{order_st_date}到{search_date}-公众号-{channel}-获取数据-{len(order_list)}条,例如: {order_list[0:1]}")
  177. return order_list
  178. def batch_save_order(data):
  179. if data is None or len(data) == 0:
  180. print('数据为空,不执行数据库操作!')
  181. else:
  182. sql = 'INSERT IGNORE INTO quchen_text.order(amount,channel,channel_id,date,from_novel,order_id,order_time,platform,reg_time,stage,user_id) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
  183. connect = MySQLConnection()
  184. try:
  185. num = connect.batch(sql, data)
  186. # 提交
  187. connect.commit()
  188. print('订单数据最终入库【{num}】条'.format(num=num))
  189. except Exception as e:
  190. print('订单数据入库失败:', e)
  191. finally:
  192. connect.close()
  193. def start_order_job():
  194. start_exec_seconds = date_util.getCurrentSecondTime()
  195. st_unix, et_unix = date_util.getPreviousHourAndCurrentHourSecondTime(start_exec_seconds)
  196. account_list = get_yangguang_account_list()
  197. # st_unix = 1602313200 # 2020/10/10 15:0:0
  198. # et_unix = 1602316800 # 2020/10/10 16:0:0
  199. #print('查询开始时间:', st_unix, date_util.getSecondsToDatetime(st_unix))
  200. #print('查询结束时间:', et_unix, date_util.getSecondsToDatetime(et_unix))
  201. ## 20201028添加阳光平台
  202. batch_save_order(get_yangguang_order(st_unix, et_unix, account_list))
  203. print('订单同步执行时间(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
  204. #if __name__ == '__main__':
  205. # start_order_job()
  206. start_job_time = '2020-11-17 01:20:00'
  207. if __name__ == '__main__':
  208. scheduler = BlockingScheduler()
  209. scheduler.add_job(start_order_job, 'interval', days =1 ,start_date=start_job_time)
  210. scheduler.start()