yangguang.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. from model.DataBaseUtils import MysqlUtils
  2. from model.DingTalkUtils import DingTalkDecorators, DingTalkUtils
  3. from model.DateUtils import DateUtils
  4. from model import ComUtils
  5. import time
  6. import json
  7. import requests
  8. db = MysqlUtils()
  9. du = DateUtils()
  10. @DingTalkDecorators("阳光")
  11. def yangguang(start=None, end=None):
  12. get_channel_info()
  13. accounts = get_account("阳光")
  14. if start:
  15. start = start + ' 00:00:00'
  16. end = end + ' 23:59:59'
  17. else:
  18. start = du.getTodayOrYestoday() + ' 00:00:00'
  19. end = du.get_n_hours_ago(0)
  20. client_id = 10008097
  21. token = '2xa1d55tTPBjeEA8Ho'
  22. if accounts.__len__() == 0:
  23. return
  24. else:
  25. print(f"阳光账号数:{accounts.__len__()}")
  26. for i in accounts:
  27. stage = i[0]
  28. vip_id = i[1]
  29. print(stage, vip_id)
  30. print(vip_id)
  31. # get_yg_vip_channel(stage, vip_id, client_id, token)
  32. get_yg_data(stage, vip_id, client_id, token, start, end)
  33. print(check())
  34. parse_order_data()
  35. def get_yg_data(stage, vip_id, client_id, token, start, end):
  36. url = "https://data.yifengaf.cn:443/channeldata/data/orders/list"
  37. nonce = ComUtils.get_random_str()
  38. timestamp = int(time.time())
  39. signaure = ComUtils.sha1(str(token) + str(timestamp) + str(client_id) + str(nonce))
  40. params = {
  41. "client_id": client_id,
  42. "token": token,
  43. "nonce": nonce,
  44. "timestamp": timestamp,
  45. "signaure": signaure,
  46. "vip_id": vip_id,
  47. "start_time": start, # %Y-%m-%d %H:%i:%s:
  48. "end_time": end
  49. }
  50. headers = {"Content-Type": "application/json"}
  51. for i in range(5):
  52. try:
  53. r = requests.post(url=url, data=json.dumps(params), headers=headers, timeout=5)
  54. break
  55. except:
  56. pass
  57. raise
  58. print(vip_id, r.text)
  59. task_id = json.loads(r.text).get("data").get("task_id")
  60. db.quchen_text.execute(
  61. f"replace into yangguang_path(vip_id,task_id,stage,type) values ('{vip_id}','{task_id}','{stage}','order')")
  62. def get_yg_vip_channel(stage, vip_id, client_id, token):
  63. url = "https://data.yifengaf.cn:443/channeldata/data/account/list"
  64. nonce = ComUtils.get_random_str()
  65. timestamp = int(time.time())
  66. signaure = ComUtils.sha1(str(token) + str(timestamp) + str(client_id) + str(nonce))
  67. params = {
  68. "client_id": client_id,
  69. "token": token,
  70. "nonce": nonce,
  71. "timestamp": timestamp,
  72. "signaure": signaure,
  73. "vip_id": vip_id,
  74. }
  75. headers = {"Content-Type": "application/json"}
  76. r = requests.post(url=url, data=json.dumps(params), headers=headers)
  77. print(r.text)
  78. task_id = json.loads(r.text).get("data").get("task_id")
  79. db.quchen_text.execute(
  80. f"replace into yangguang_path(vip_id,task_id,stage,type) values ('{vip_id}','{task_id}','{stage}','channel')")
  81. def get_channel_info():
  82. accounts = get_account("阳光")
  83. client_id = 10008097
  84. token = '2xa1d55tTPBjeEA8Ho'
  85. for i in accounts:
  86. stage = i[0]
  87. vip_id = i[1]
  88. print(vip_id)
  89. get_yg_vip_channel(stage, vip_id, client_id, token)
  90. def parse_order_data():
  91. print(111)
  92. accounts = get_account("阳光")
  93. for i in accounts:
  94. # print(i)
  95. try:
  96. vip_id = i[1]
  97. stage = i[0]
  98. data = parse_order(vip_id, stage)
  99. save_data(data)
  100. except Exception as e:
  101. DingTalkUtils.send('阳光出错vipid:' + vip_id, phone='15168342316')
  102. print(e)
  103. def parse_order(vip_id, stage):
  104. print(vip_id)
  105. url = db.quchen_text.getOne(f"select path from yangguang_path where type='channel' and vip_id={vip_id} ")
  106. for i in range(5):
  107. try:
  108. r = requests.get(url, timeout=5).text
  109. print(r)
  110. break
  111. except:
  112. pass
  113. raise
  114. channel_di = {}
  115. a = r.split('}')
  116. for i in a[:-1]:
  117. if i[-1] != '}':
  118. b = json.loads(i + "}", strict=False)
  119. else:
  120. b = json.loads(i, strict=False)
  121. channel_di[b["channel_id"]] = (b["wx_nickname"], b['app_id'])
  122. # print(channel_di)
  123. print(f'{stage} 有channel数:{len(channel_di)}')
  124. info = db.quchen_text.getData(f"select stage,path from yangguang_path where type='order' and vip_id={vip_id}")
  125. stage = info[0][0]
  126. path = info[0][1]
  127. for i in range(5):
  128. try:
  129. text = requests.get(path, timeout=5).text.replace('"referral_url":,', '')
  130. print(text)
  131. break
  132. except Exception as e:
  133. print('channel', e)
  134. # raise
  135. insert_data = []
  136. for j in text.split("}")[:-1]:
  137. if j[-1] != '}':
  138. j = j + '}'
  139. try:
  140. di = json.loads(j, strict=False)
  141. except Exception as e:
  142. print(j)
  143. print(e)
  144. insert_data.append((
  145. di["create_time"][:10],
  146. stage,
  147. '阳光',
  148. channel_di[di['channel_id']][0],
  149. di['channel_id'],
  150. di['openid'],
  151. di['create_time'],
  152. di['user_createtime'],
  153. di['money'],
  154. di.get('book_name'),
  155. di['merchant_id'],
  156. 2 if di['state'] == "完成" else 1,
  157. di['user_id'],
  158. channel_di[di['channel_id']][1],
  159. 1 if di['type'] == '书币充值' else 2,
  160. di['merchant_id'],
  161. di['transaction_id']
  162. ))
  163. # print(insert_data)
  164. # exit(0)
  165. print("订单数:" + str(insert_data.__len__()))
  166. save_data(insert_data)
  167. def save_data(data):
  168. sql = """replace into ods_order(date,stage,platform,channel,channel_id,user_id,order_time,
  169. reg_time,amount,from_novel,order_id,status,platform_user_id,wechat_app_id,order_type,
  170. trade_no,transaction_no) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) """
  171. # print(sql)
  172. db.quchen_text.executeMany(sql, data)
  173. def check():
  174. x = 1
  175. while True:
  176. a = db.quchen_text.getOne("select count(1) from yangguang_path where type ='order' and path is null")
  177. print(f" 回调接口 待处理数量 {a} ")
  178. if a == 0:
  179. info = '回调完成'
  180. break
  181. time.sleep(60)
  182. x += 1
  183. if x > 10:
  184. DingTalkUtils().send('阳光订单回调延时10min', '15168342316')
  185. info = '回调未完成'
  186. break
  187. return info
  188. def get_account(plactform, id=None):
  189. op = f" and id={id} " if id else ''
  190. data = db.quchen_text.getData(f"select text from order_account_text where platform='{plactform}' {op}")
  191. new_data = []
  192. for i in data:
  193. new_data.append(i[0].replace('\n', '').split(","))
  194. return new_data
  195. def daily_yg():
  196. st = du.get_n_days(-10)
  197. et = du.get_n_days(-1)
  198. yangguang(st, et)
  199. if __name__ == '__main__':
  200. # get_channel_info()
  201. # exit(0)
  202. yangguang(start=du.get_n_days(-29), end=du.get_n_days(0))
  203. # yangguang('2021-05-28','2021-05-28')
  204. # daily_yg()
  205. # for i in du.split_date2('2020-06-28','2020-11-03',30):
  206. # print(i)
  207. # yangguang(i[0], i[1])
  208. # parse_order_data()
  209. # get_channel_info()
  210. # yangguang(start=du.get_n_days(-1),end=du.get_n_days(0))
  211. parse_order('29600', stage='趣程27期')