from model.DataBaseUtils import MysqlUtils from model.DingTalkUtils import DingTalkDecorators,DingTalkUtils from model.DateUtils import DateUtils from model import ComUtils import time import json import requests db = MysqlUtils() du = DateUtils() @DingTalkDecorators("阳光") def yangguang(start=None, end=None): accounts = get_account("阳光") if start: start = start + ' 00:00:00' end = end + ' 23:59:59' else: start = du.getTodayOrYestoday() + ' 00:00:00' end = du.get_n_hours_ago(0) client_id = 10008097 token = '2xa1d55tTPBjeEA8Ho' if accounts.__len__() == 0: return else: print(f"阳光账号数:{accounts.__len__()}") for i in accounts: stage = i[0] vip_id = i[1] print(stage,vip_id) print(vip_id) # get_yg_vip_channel(stage, vip_id, client_id, token) get_yg_data(stage, vip_id, client_id, token, start, end) print(check()) parse_order_data() def get_yg_data(stage,vip_id,client_id,token,start,end): url = "https://data.yifengaf.cn:443/channeldata/data/orders/list" nonce=ComUtils.get_random_str() timestamp=int(time.time()) signaure =ComUtils.sha1(str(token) + str(timestamp) + str(client_id) + str(nonce)) params = { "client_id": client_id, "token": token, "nonce": nonce, "timestamp": timestamp, "signaure": signaure, "vip_id": vip_id, "start_time":start, # %Y-%m-%d %H:%i:%s: "end_time":end } headers={"Content-Type":"application/json"} for i in range(5): try: r=requests.post(url=url,data=json.dumps(params),headers=headers,timeout=5) break except: pass raise print(vip_id,r.text) task_id = json.loads(r.text).get("data").get("task_id") db.quchen_text.execute(f"replace into yangguang_path(vip_id,task_id,stage,type) values ('{vip_id}','{task_id}','{stage}','order')") def get_yg_vip_channel(stage, vip_id, client_id, token): url = "https://data.yifengaf.cn:443/channeldata/data/account/list" nonce = ComUtils.get_random_str() timestamp = int(time.time()) signaure = ComUtils.sha1(str(token) + str(timestamp) + str(client_id) + str(nonce)) params = { "client_id": client_id, "token": token, "nonce": nonce, "timestamp": timestamp, "signaure": signaure, "vip_id": vip_id, } headers = {"Content-Type": "application/json"} r = requests.post(url=url, data=json.dumps(params), headers=headers) print(r.text) task_id = json.loads(r.text).get("data").get("task_id") db.quchen_text.execute( f"replace into yangguang_path(vip_id,task_id,stage,type) values ('{vip_id}','{task_id}','{stage}','channel')") def get_channel_info(): accounts = get_account("阳光") client_id = 10008097 token = '2xa1d55tTPBjeEA8Ho' for i in accounts: stage = i[0] vip_id = i[1] print(vip_id) get_yg_vip_channel(stage, vip_id, client_id, token) def parse_order_data(): print(111) accounts = get_account("阳光") for i in accounts: # print(i) vip_id = i[1] stage = i[0] data = parse_order(vip_id, stage) save_data(data) def parse_order(vip_id,stage): print(vip_id) url = db.quchen_text.getOne(f"select path from yangguang_path where type='channel' and vip_id={vip_id} ") for i in range(5): try: r = requests.get(url,timeout=5).text break except: pass raise channel_di={} a = r.split('}') for i in a[:-1]: if i[-1] != '}': b=json.loads(i + "}", strict=False) else: b=json.loads(i, strict=False) channel_di[b["channel_id"]]=(b["wx_nickname"],b['app_id']) # print(channel_di) print(f'{stage} 有channel数:{len(channel_di)}') info=db.quchen_text.getData(f"select stage,path from yangguang_path where type='order' and vip_id={vip_id}") stage=info[0][0] path=info[0][1] for i in range(5): try: text=requests.get(path,timeout=5).text.replace('"referral_url":,','') # print(text) break except: pass raise insert_data=[] for j in text.split("}")[:-1]: if j[-1] != '}': j=j+'}' try: di=json.loads(j, strict=False) except Exception as e: print(j) print(e) insert_data.append(( di["create_time"][:10], stage, '阳光', channel_di[di['channel_id']][0], di['channel_id'], di['openid'], di['create_time'], di['user_createtime'], di['money'], di.get('book_name'), di['merchant_id'], 2 if di['state']=="完成" else 1, di['user_id'], channel_di[di['channel_id']][1], 1 if di['type'] == '书币充值' else 2, di['merchant_id'], di['transaction_id'] )) # print(insert_data) # exit(0) print("订单数:"+ str(insert_data.__len__())) save_data(insert_data) def save_data(data): sql = """replace into ods_order(date,stage,platform,channel,channel_id,user_id,order_time, reg_time,amount,from_novel,order_id,status,platform_user_id,wechat_app_id,order_type, trade_no,transaction_no) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) """ # print(sql) db.quchen_text.executeMany(sql,data) def check(): x = 1 while True: a = db.quchen_text.getOne("select count(1) from yangguang_path where type ='order' and path is null") print(f" 回调接口 待处理数量 {a} ") if a == 0: info = '回调完成' break time.sleep(60) x += 1 if x > 10: DingTalkUtils().send('阳光订单回调延时10min', '15168342316') info = '回调未完成' break return info def get_account(plactform, id=None): op = f" and id={id} " if id else '' data = db.quchen_text.getData(f"select text from order_account_text where platform='{plactform}' {op}") new_data = [] for i in data: new_data.append(i[0].replace('\n', '').split(",")) return new_data def get_yg_user_info(): """获取阳光的用户信息""" accounts = get_account("阳光") # if start: # start = start + ' 00:00:00' # end = end + ' 23:59:59' # else: # start = du.getTodayOrYestoday() + ' 00:00:00' # end = du.get_n_hours_ago(0) client_id = 10008097 token = '2xa1d55tTPBjeEA8Ho' if accounts.__len__() == 0: return else: print(f"阳光账号数:{accounts.__len__()}") for i in accounts: stage = i[0] vip_id = i[1] print(vip_id) # get_yg_vip_channel(stage, vip_id, client_id, token) get_yg_data(stage, vip_id, client_id, token, start, end) print(check()) # parse_data() def daily_yg(): st = du.get_n_days(-10) et = du.get_n_days(-1) yangguang(st, et) if __name__ == '__main__': # get_channel_info() # exit(0) yangguang(start=du.get_n_days(-1),end=du.get_n_days(0)) # # exit(0) # yangguang('2021-05-28','2021-05-28') # daily_yg() # for i in du.split_date2('2020-06-28','2020-11-03',30): # print(i) # yangguang(i[0], i[1]) # parse_order_data() # get_channel_info() # yangguang(start=du.get_n_days(-10),end=du.get_n_days(0))