| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275 | from model.DataBaseUtils import MysqlUtilsfrom model.DingTalkUtils import DingTalkDecorators,DingTalkUtilsfrom model.DateUtils import DateUtilsfrom model import ComUtilsimport timeimport jsonimport requestsdb = MysqlUtils()du = DateUtils()@DingTalkDecorators("阳光")def yangguang(start=None, end=None):    accounts = get_account("阳光")    if start:        start = start + ' 00:00:00'        end = end + ' 23:59:59'    else:        start = du.getTodayOrYestoday() + ' 00:00:00'        end = du.get_n_hours_ago(0)    client_id = 10008097    token = '2xa1d55tTPBjeEA8Ho'    if accounts.__len__() == 0:        return    else:        print(f"阳光账号数:{accounts.__len__()}")    for i in accounts:        stage = i[0]        vip_id = i[1]        print(stage,vip_id)        print(vip_id)        # get_yg_vip_channel(stage, vip_id, client_id, token)        get_yg_data(stage, vip_id, client_id, token, start, end)    print(check())    parse_order_data()def get_yg_data(stage,vip_id,client_id,token,start,end):    url = "https://data.yifengaf.cn:443/channeldata/data/orders/list"    nonce=ComUtils.get_random_str()    timestamp=int(time.time())    signaure =ComUtils.sha1(str(token) + str(timestamp) + str(client_id) + str(nonce))    params = {        "client_id": client_id,        "token": token,        "nonce": nonce,        "timestamp": timestamp,        "signaure": signaure,        "vip_id": vip_id,        "start_time":start,   # %Y-%m-%d %H:%i:%s:        "end_time":end    }    headers={"Content-Type":"application/json"}    for i in range(5):        try:            r=requests.post(url=url,data=json.dumps(params),headers=headers,timeout=5)            break        except:            pass        raise    print(vip_id,r.text)    task_id = json.loads(r.text).get("data").get("task_id")    db.quchen_text.execute(f"replace into yangguang_path(vip_id,task_id,stage,type) values ('{vip_id}','{task_id}','{stage}','order')")def get_yg_vip_channel(stage, vip_id, client_id, token):    url = "https://data.yifengaf.cn:443/channeldata/data/account/list"    nonce = ComUtils.get_random_str()    timestamp = int(time.time())    signaure = ComUtils.sha1(str(token) + str(timestamp) + str(client_id) + str(nonce))    params = {        "client_id": client_id,        "token": token,        "nonce": nonce,        "timestamp": timestamp,        "signaure": signaure,        "vip_id": vip_id,    }    headers = {"Content-Type": "application/json"}    r = requests.post(url=url, data=json.dumps(params), headers=headers)    print(r.text)    task_id = json.loads(r.text).get("data").get("task_id")    db.quchen_text.execute(        f"replace into yangguang_path(vip_id,task_id,stage,type) values ('{vip_id}','{task_id}','{stage}','channel')")def get_channel_info():    accounts = get_account("阳光")    client_id = 10008097    token = '2xa1d55tTPBjeEA8Ho'    for i in accounts:        stage = i[0]        vip_id = i[1]        print(vip_id)        get_yg_vip_channel(stage, vip_id, client_id, token)def parse_order_data():    print(111)    accounts = get_account("阳光")    for i in accounts:        # print(i)        vip_id = i[1]        stage = i[0]        data = parse_order(vip_id, stage)        save_data(data)def parse_order(vip_id,stage):    print(vip_id)    url = db.quchen_text.getOne(f"select path from yangguang_path where type='channel' and vip_id={vip_id} ")    for i in range(5):        try:            r = requests.get(url,timeout=5).text            break        except:            pass        raise    channel_di={}    a = r.split('}')    for i in a[:-1]:        if i[-1] != '}':            b=json.loads(i + "}", strict=False)        else:            b=json.loads(i, strict=False)        channel_di[b["channel_id"]]=(b["wx_nickname"],b['app_id'])    # print(channel_di)    print(f'{stage} 有channel数:{len(channel_di)}')    info=db.quchen_text.getData(f"select stage,path from yangguang_path where type='order' and vip_id={vip_id}")    stage=info[0][0]    path=info[0][1]    for i in range(5):        try:            text=requests.get(path,timeout=5).text.replace('"referral_url":,','')            # print(text)            break        except:            pass            raise    insert_data=[]    for j in text.split("}")[:-1]:        if j[-1] != '}':            j=j+'}'        try:            di=json.loads(j, strict=False)        except Exception as e:            print(j)            print(e)        insert_data.append((            di["create_time"][:10],            stage,            '阳光',            channel_di[di['channel_id']][0],            di['channel_id'],            di['openid'],            di['create_time'],            di['user_createtime'],            di['money'],            di.get('book_name'),            di['merchant_id'],            2 if di['state']=="完成" else 1,            di['user_id'],            channel_di[di['channel_id']][1],            1 if di['type'] == '书币充值' else 2,            di['merchant_id'],            di['transaction_id']        ))    # print(insert_data)    # exit(0)    print("订单数:"+ str(insert_data.__len__()))    save_data(insert_data)def save_data(data):    sql = """replace into ods_order(date,stage,platform,channel,channel_id,user_id,order_time,    reg_time,amount,from_novel,order_id,status,platform_user_id,wechat_app_id,order_type,    trade_no,transaction_no) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) """    # print(sql)    db.quchen_text.executeMany(sql,data)def check():    x = 1    while True:        a = db.quchen_text.getOne("select count(1) from yangguang_path where type ='order' and path is null")        print(f" 回调接口 待处理数量 {a} ")        if a == 0:            info = '回调完成'            break        time.sleep(60)        x += 1        if x > 10:            DingTalkUtils().send('阳光订单回调延时10min', '15168342316')            info = '回调未完成'            break    return infodef get_account(plactform, id=None):    op = f" and id={id} " if id else ''    data = db.quchen_text.getData(f"select text from order_account_text where platform='{plactform}' {op}")    new_data = []    for i in data:        new_data.append(i[0].replace('\n', '').split(","))    return new_datadef get_yg_user_info():    """获取阳光的用户信息"""    accounts = get_account("阳光")    # if start:    #     start = start + ' 00:00:00'    #     end = end + ' 23:59:59'    # else:    #     start = du.getTodayOrYestoday() + ' 00:00:00'    #     end = du.get_n_hours_ago(0)    client_id = 10008097    token = '2xa1d55tTPBjeEA8Ho'    if accounts.__len__() == 0:        return    else:        print(f"阳光账号数:{accounts.__len__()}")    for i in accounts:        stage = i[0]        vip_id = i[1]        print(vip_id)        # get_yg_vip_channel(stage, vip_id, client_id, token)        get_yg_data(stage, vip_id, client_id, token, start, end)    print(check())    # parse_data()def daily_yg():    st = du.get_n_days(-10)    et = du.get_n_days(-1)    yangguang(st, et)if __name__ == '__main__':    # get_channel_info()    # exit(0)    yangguang(start=du.get_n_days(-1),end=du.get_n_days(0))    # # exit(0)    # yangguang('2021-05-28','2021-05-28')    # daily_yg()    # for i in du.split_date2('2020-06-28','2020-11-03',30):    #     print(i)    #     yangguang(i[0], i[1])    # parse_order_data()    # get_channel_info()    # yangguang(start=du.get_n_days(-10),end=du.get_n_days(0))
 |