yangguang.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. from model.DataBaseUtils import MysqlUtils
  2. from model.DingTalkUtils import DingTalkDecorators, DingTalkUtils
  3. from model.DateUtils import DateUtils
  4. from model import ComUtils
  5. import time
  6. import json
  7. import requests
  8. db = MysqlUtils()
  9. du = DateUtils()
  10. @DingTalkDecorators("阳光")
  11. def yangguang(start=None, end=None):
  12. accounts = get_account("阳光")
  13. if start:
  14. start = start + ' 00:00:00'
  15. end = end + ' 23:59:59'
  16. else:
  17. start = du.getTodayOrYestoday() + ' 00:00:00'
  18. end = du.get_n_hours_ago(0)
  19. client_id = 10008097
  20. token = '2xa1d55tTPBjeEA8Ho'
  21. if accounts.__len__() == 0:
  22. return
  23. else:
  24. print(f"阳光账号数:{accounts.__len__()}")
  25. for i in accounts:
  26. stage = i[0]
  27. vip_id = i[1]
  28. print(stage, vip_id)
  29. print(vip_id)
  30. # get_yg_vip_channel(stage, vip_id, client_id, token)
  31. get_yg_data(stage, vip_id, client_id, token, start, end)
  32. print(check())
  33. parse_order_data()
  34. def get_yg_data(stage, vip_id, client_id, token, start, end):
  35. url = "https://data.yifengaf.cn:443/channeldata/data/orders/list"
  36. nonce = ComUtils.get_random_str()
  37. timestamp = int(time.time())
  38. signaure = ComUtils.sha1(str(token) + str(timestamp) + str(client_id) + str(nonce))
  39. params = {
  40. "client_id": client_id,
  41. "token": token,
  42. "nonce": nonce,
  43. "timestamp": timestamp,
  44. "signaure": signaure,
  45. "vip_id": vip_id,
  46. "start_time": start, # %Y-%m-%d %H:%i:%s:
  47. "end_time": end
  48. }
  49. headers = {"Content-Type": "application/json"}
  50. for i in range(5):
  51. try:
  52. r = requests.post(url=url, data=json.dumps(params), headers=headers, timeout=5)
  53. break
  54. except:
  55. pass
  56. raise
  57. print(vip_id, r.text)
  58. task_id = json.loads(r.text).get("data").get("task_id")
  59. db.quchen_text.execute(
  60. f"replace into yangguang_path(vip_id,task_id,stage,type) values ('{vip_id}','{task_id}','{stage}','order')")
  61. def get_yg_vip_channel(stage, vip_id, client_id, token):
  62. url = "https://data.yifengaf.cn:443/channeldata/data/account/list"
  63. nonce = ComUtils.get_random_str()
  64. timestamp = int(time.time())
  65. signaure = ComUtils.sha1(str(token) + str(timestamp) + str(client_id) + str(nonce))
  66. params = {
  67. "client_id": client_id,
  68. "token": token,
  69. "nonce": nonce,
  70. "timestamp": timestamp,
  71. "signaure": signaure,
  72. "vip_id": vip_id,
  73. }
  74. headers = {"Content-Type": "application/json"}
  75. r = requests.post(url=url, data=json.dumps(params), headers=headers)
  76. print(r.text)
  77. task_id = json.loads(r.text).get("data").get("task_id")
  78. db.quchen_text.execute(
  79. f"replace into yangguang_path(vip_id,task_id,stage,type) values ('{vip_id}','{task_id}','{stage}','channel')")
  80. def get_channel_info():
  81. accounts = get_account("阳光")
  82. client_id = 10008097
  83. token = '2xa1d55tTPBjeEA8Ho'
  84. for i in accounts:
  85. stage = i[0]
  86. vip_id = i[1]
  87. print(vip_id)
  88. get_yg_vip_channel(stage, vip_id, client_id, token)
  89. def parse_order_data():
  90. print(111)
  91. accounts = get_account("阳光")
  92. for i in accounts:
  93. # print(i)
  94. try:
  95. vip_id = i[1]
  96. stage = i[0]
  97. data = parse_order(vip_id, stage)
  98. save_data(data)
  99. except Exception as e:
  100. DingTalkUtils.send('阳光出错vipid:' + vip_id, phone='15168342316')
  101. print(e)
  102. def parse_order(vip_id, stage):
  103. print(vip_id)
  104. url = db.quchen_text.getOne(f"select path from yangguang_path where type='channel' and vip_id={vip_id} ")
  105. for i in range(5):
  106. try:
  107. r = requests.get(url, timeout=5).text
  108. print(r)
  109. break
  110. except:
  111. pass
  112. raise
  113. channel_di = {}
  114. a = r.split('}')
  115. for i in a[:-1]:
  116. if i[-1] != '}':
  117. b = json.loads(i + "}", strict=False)
  118. else:
  119. b = json.loads(i, strict=False)
  120. channel_di[b["channel_id"]] = (b["wx_nickname"], b['app_id'])
  121. # print(channel_di)
  122. print(f'{stage} 有channel数:{len(channel_di)}')
  123. info = db.quchen_text.getData(f"select stage,path from yangguang_path where type='order' and vip_id={vip_id}")
  124. stage = info[0][0]
  125. path = info[0][1]
  126. for i in range(5):
  127. try:
  128. text = requests.get(path, timeout=5).text.replace('"referral_url":,', '')
  129. print(text)
  130. break
  131. except Exception as e:
  132. print('channel', e)
  133. # raise
  134. insert_data = []
  135. for j in text.split("}")[:-1]:
  136. if j[-1] != '}':
  137. j = j + '}'
  138. try:
  139. di = json.loads(j, strict=False)
  140. except Exception as e:
  141. print(j)
  142. print(e)
  143. insert_data.append((
  144. di["create_time"][:10],
  145. stage,
  146. '阳光',
  147. channel_di[di['channel_id']][0],
  148. di['channel_id'],
  149. di['openid'],
  150. di['create_time'],
  151. di['user_createtime'],
  152. di['money'],
  153. di.get('book_name'),
  154. di['merchant_id'],
  155. 2 if di['state'] == "完成" else 1,
  156. di['user_id'],
  157. channel_di[di['channel_id']][1],
  158. 1 if di['type'] == '书币充值' else 2,
  159. di['merchant_id'],
  160. di['transaction_id']
  161. ))
  162. # print(insert_data)
  163. # exit(0)
  164. print("订单数:" + str(insert_data.__len__()))
  165. save_data(insert_data)
  166. def save_data(data):
  167. sql = """replace into ods_order(date,stage,platform,channel,channel_id,user_id,order_time,
  168. reg_time,amount,from_novel,order_id,status,platform_user_id,wechat_app_id,order_type,
  169. trade_no,transaction_no) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) """
  170. # print(sql)
  171. db.quchen_text.executeMany(sql, data)
  172. def check():
  173. x = 1
  174. while True:
  175. a = db.quchen_text.getOne("select count(1) from yangguang_path where type ='order' and path is null")
  176. print(f" 回调接口 待处理数量 {a} ")
  177. if a == 0:
  178. info = '回调完成'
  179. break
  180. time.sleep(60)
  181. x += 1
  182. if x > 10:
  183. DingTalkUtils().send('阳光订单回调延时10min', '15168342316')
  184. info = '回调未完成'
  185. break
  186. return info
  187. def get_account(plactform, id=None):
  188. op = f" and id={id} " if id else ''
  189. data = db.quchen_text.getData(f"select text from order_account_text where platform='{plactform}' {op}")
  190. new_data = []
  191. for i in data:
  192. new_data.append(i[0].replace('\n', '').split(","))
  193. return new_data
  194. def daily_yg():
  195. st = du.get_n_days(-10)
  196. et = du.get_n_days(-1)
  197. yangguang(st, et)
  198. if __name__ == '__main__':
  199. # get_channel_info()
  200. # exit(0)
  201. # yangguang(start=du.get_n_days(-1), end=du.get_n_days(0))
  202. # yangguang('2021-05-28','2021-05-28')
  203. # daily_yg()
  204. # for i in du.split_date2('2020-06-28','2020-11-03',30):
  205. # print(i)
  206. # yangguang(i[0], i[1])
  207. # parse_order_data()
  208. # get_channel_info()
  209. # yangguang(start=du.get_n_days(-10),end=du.get_n_days(0))
  210. parse_order('29600', stage='趣程27期')