import time from model import ComUtils import requests import json from model.DataBaseUtils import MysqlUtils from model.DateUtils import DateUtils from model.ComUtils import * import math from model.DateUtils import DateUtils import logging from urllib import parse logging.getLogger().setLevel(logging.WARNING) db=MysqlUtils() du=DateUtils() def get_yg_vip_channel(stage, vip_id, client_id, token): url = "https://data.yifengaf.cn:443/channeldata/data/account/list" nonce = ComUtils.get_random_str() timestamp = int(time.time()) signaure = ComUtils.sha1(str(token) + str(timestamp) + str(client_id) + str(nonce)) params = { "client_id": client_id, "token": token, "nonce": nonce, "timestamp": timestamp, "signaure": signaure, "vip_id": vip_id, } headers = {"Content-Type": "application/json"} r = requests.post(url=url, data=json.dumps(params), headers=headers) print(r.text) task_id = json.loads(r.text).get("data").get("task_id") db.quchen_text.execute( f"replace into yangguang_path(vip_id,task_id,stage,type) values ('{vip_id}','{task_id}','{stage}','channel')") def get_yg_data(stage,vip_id,client_id,token,start,end): url = "https://data.yifengaf.cn:443/channeldata/data/orders/list" nonce=ComUtils.get_random_str() timestamp=int(time.time()) signaure =ComUtils.sha1(str(token) + str(timestamp) + str(client_id) + str(nonce)) params = { "client_id": client_id, "token": token, "nonce": nonce, "timestamp": timestamp, "signaure": signaure, "vip_id": vip_id, "start_time":start, # %Y-%m-%d %H:%i:%s: "end_time":end } headers={"Content-Type":"application/json"} r=requests.post(url=url,data=json.dumps(params),headers=headers) print(r.text) task_id = json.loads(r.text).get("data").get("task_id") db.quchen_text.execute(f"replace into yangguang_path(vip_id,task_id,stage,type) values ('{vip_id}','{task_id}','{stage}','order')") def parse_yg_data(vip_id,stage): url = db.quchen_text.getOne(f"select path from yangguang_path where type='channel' and vip_id={vip_id} ") r = requests.get(url).text channel_di={} a = r.split('}') for i in a[:-1]: if i[-1] != '}': b=json.loads(i + "}", strict=False) else: b=json.loads(i, strict=False) channel_di[b["channel_id"]]=b["wx_nickname"] # print(channel_di) print(f'{stage} 有channel数:{len(channel_di)}') info=db.quchen_text.getData(f"select stage,path from yangguang_path where type='order' and vip_id={vip_id}") stage=info[0][0] path=info[0][1] text=requests.get(path).text.replace('"referral_url":,','') insert_data=[] for j in text.split("}")[:-1]: if j[-1] != '}': j=j+'}' try: di=json.loads(j, strict=False) except Exception as e: print(j) print(e) platform = "阳光" channel_id = di["channel_id"] channel=channel_di[channel_id] user_id = di["openid"] order_time = di["create_time"] reg_time = di["user_createtime"] from_novel = di["book_name"] if di['book_name'] else '' amount = di["money"] order_id = di["transaction_id"] if di["transaction_id"] else di['merchant_id'] status = 2 if di['state']=="完成" else 1 date = order_time[:10] insert_data.append((date,stage,platform,channel,channel_id,user_id,order_time,reg_time,amount,from_novel,order_id,status)) # print(insert_data) # exit(0) print("订单数:"+ str(insert_data.__len__())) db.quchen_text.executeMany("replace into ods_order values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",tuple(insert_data)) def get_hs_order_task(start, end, account): url = 'https://vip.rlcps.cn/api/getMerchants' apiKey = str(account[0]) apiSecurity = account[1] timestamp = str(int(time.time())) sign = md5(apiKey + timestamp + apiSecurity).upper() params = { 'apiKey': apiKey, 'apiSecurity': apiSecurity, 'timestamp': timestamp, 'sign': sign } response_result_json = requests.post(url, params).json() if 'data' not in response_result_json.keys(): print('花生账号【{apiKey}】本次请求数据异常,响应报文【{result}】'.format(apiKey=apiKey, result=response_result_json)) channel_data = response_result_json['data'] # print(f"{account[2]} 有channel{len(channel_data)}个:{str([i['merchant_name'] for i in channel_data])}") li = [] for merchant in channel_data: orders = get_huasheng_order(start, end, account, merchant) li.extend(orders) if len(li) > 0: print(f"花生账号:{account[2]} 有order{len(li)}个") # print(li) db.quchen_text.executeMany("replace into ods_order values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",li) def get_huasheng_order(start,end, account, merchant): li =[] apiKey = str(account[0]) apiSecurity = account[1] stage = account[2] timestamp = str(int(time.time())) order_url = 'https://vip.rlcps.cn/api/orderList' merchant_id = merchant['merchant_id'] merchant_name = merchant['merchant_name'] limit = 500 for date in du.getDateLists(start,end): page = 1 while True: sign = md5(apiKey + date + str(merchant_id) + timestamp + apiSecurity).upper() order_params = { 'apiKey': apiKey, 'apiSecurity': apiSecurity, 'timestamp': timestamp, 'date': date, 'merchant_id': merchant_id, 'sign': sign, 'page': page, 'limit': limit } r = requests.post(order_url, order_params) # print(r.text) response_result_json = r.json() if 'data' not in response_result_json.keys(): print('花生账号【{key}】, 查询时间【{date}】, 渠道【{merchant_id}:{merchant_name}】本次请求数据异常,响应报文【{result}】' .format(key=apiKey, date=date, merchant_id=merchant_id, merchant_name=merchant_name, result=response_result_json)) break if len(response_result_json['data']) == 0: break order_item_list = response_result_json['data'] if len(order_item_list) == 0: break for i in order_item_list: li.append( (i['request_at'][:10], stage, '花生', merchant_name, merchant_id, i['openid'], i['request_at'], i['join_at'], i['amount'], i['book_name'], i['trans_id'] if i['trans_id'] != '' else i['order_num'], 2 if i['order_status'] == 1 else 1 ) ) if len(order_item_list) < limit: break else: page = page + 1 return li def save_hs_data(data): sql = 'replace INTO quchen_text.ods_order ' \ ' VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);' db.quchen_text.executeMany(sql,data) def get_qiyue_order_task(start, end, account): """一分钟请求60次的限制""" order_list = [] # 参数 order_url = "https://api.zhangwenwenhua.com" + "/v1/orders" stage = account[0] token = account[1] size = 50 freq=0 for date in du.getDateLists(start, end): page = 1 while True: timestamp = int(time.time()) url = order_url + "?" + "token=" + str(token) + "×tamp=" + str(timestamp) + "&page=" + str( page) + "&size=" + str(size) + "&date=" + date rsp=requests.get(url=url) response_result_json = rsp.json() # print(response_result_json) freq += 1 if freq == 59: print("一分钟请求60次的限制 等待中") time.sleep(61) freq = 0 code = response_result_json['code'] if code != 0: print(stage, '七悦充值接口异常:', response_result_json) break result_data = response_result_json['data'] total = result_data['total'] if total <= 0: break order_item_list = result_data['data'] for x in order_item_list: create_time = DateUtils.stamp_to_str(x['create_time'],'%Y-%m-%d %H:%M:%S') reg_time = DateUtils.stamp_to_str(x['user_create_time'],'%Y-%m-%d %H:%M:%S') order_list.append(( create_time[:10], stage, '七悦', x['wechat_app_name'], # 公众号名称 x['channel_id'], x['user_open_id'], create_time, reg_time, # 用户注册时间 x['money'], x['book_name'], x['transaction_no'] if x['transaction_no'] != 0 else x['trade_no'], # 订单id x['state']) ) next_page_url = result_data['next_page_url'] if next_page_url is None: break page += 1 # print(len(order_list)) print(f'{stage} [{start}~{end}] 有订单{order_list.__len__()}') if order_list.__len__()>0: save_order(order_list) # print(order_list) def save_order(order_list): db.quchen_text.executeMany('replace into ods_order values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)',order_list) print("入库成功") def get_wd_account_siteid_list(account): url = 'https://bi.reading.163.com/dist-api/siteList' consumerkey = account[0] secretkey = account[1] stage = account[3] timestamp = int(time.time() * 1000) siteid_params = { "consumerkey": consumerkey, 'secretkey': secretkey, 'timestamp': timestamp, } sorted_data = sorted(siteid_params.items(), reverse=False) s = "" for k, v in sorted_data: s = s + str(k) + "=" + str(v) sign = md5(s).lower() siteid_params['sign'] = sign consumerkey = siteid_params['consumerkey'] timestamp = siteid_params['timestamp'] parameter = 'consumerkey=' + str(consumerkey) + '×tamp=' + str(timestamp) + '&sign=' + str(sign) get_url = url + "?" + parameter while True: r = requests.get(url=get_url) if r.status_code == 200: break try: id_key_list = r.json()['data'] except: return [] mpid_list = [] try: for id_key_val in id_key_list: mpid = dict(id_key_val)["mpId"] mpid_list.append(mpid) except Exception as e: print(stage, '站点查询返回结果:', r.json()) return mpid_list def get_wending_json_object(url,params): params['timestamp'] = int(time.time()*1000) sorted_data = sorted(params.items(),reverse = False) s="" for k,v in sorted_data: s = s+str(k)+"="+str(v) sign = md5(s).lower() params['sign'] = sign consumerkey = params['consumerkey'] secretkey = params['secretkey'] timestamp = params['timestamp'] siteid = params['siteid'] pageSize = params['pageSize'] starttime = params['starttime'] endtime = params['endtime'] page = params['page'] ## +'&secretkey='+str(secretkey) parameter = 'consumerkey='+str(consumerkey)+'×tamp='+str(timestamp)+'&siteid='+str(siteid)+'&pageSize='+str(pageSize)\ +'&starttime='+str(starttime)+'&endtime='+str(endtime)+'&page='+str(page)+'&sign='+str(sign) global get_url get_url = url+"?"+parameter while True: r= requests.get(url=get_url) if r.status_code==200: break else: time.sleep(1) print("请求连接出错,等待1s...") response_result_json=r.json() del params['sign'] return response_result_json def get_wd_order_task(start,end,account): order_list = [] url = 'https://bi.reading.163.com/dist-api/rechargeList' consumerkey = account[0] secretkey = account[1] siteid = account[2] stage = account[3] siteid_list = get_wd_account_siteid_list(account) # print(siteid_list) if len(siteid_list) == 0: siteid_list.append(siteid) starttime = du.date_str_to_str(start)+'0000' endtime = du.date_str_to_str(end)+'2359' for siteid in siteid_list: page = 1 while True: params = { 'consumerkey': consumerkey, 'secretkey': secretkey, 'timestamp': int(1601481600), 'siteid': siteid, 'pageSize': 1000, 'starttime': starttime, 'endtime': endtime, 'page': page} response_result_json = get_wending_json_object(url, params) # print(response_result_json) order_item_list = response_result_json['data']['rechargeList'] for x in order_item_list: order_time = DateUtils.stamp_to_str(x['createTime']) reg_time = DateUtils.stamp_to_str(x['userRegisterTime']) order_list.append( (order_time[:10], stage, '文鼎', x['wx_mpName'], x['wx_originalId'], x['wx_user_openId'], order_time, reg_time, x['money'] / 100, x['bookTitle'] if x['bookTitle'] else '', x['ewTradeId'] if x.get('ewTradeId') else x['rechargeUuid'], 2 if x['payStatus'] == 1 else 1 ) ) if len(order_item_list) < 1000: break else: page += 1 print(f"{stage} [{start}~{end}] 有订单 {order_list.__len__()}") if order_list.__len__()>0: # print(order_list) save_order(order_list) def get_zd_order_task(start,end,account): """开始到结束最多90天""" order_list = [] url = 'https://api.zhangdu520.com/channel/getorder' uid = account[0] appsecert = account[1] channel = account[2] stage = account[3] timestamp = int(time.time()) sign = md5(str(uid) + '&' + appsecert + '&' + str(timestamp)) for i in du.split_date2(start, end, 90): starttime = DateUtils.str_to_stamp(i[0]+' 00:00:00','%Y-%m-%d %H:%M:%S') endtime = DateUtils.str_to_stamp(i[1]+' 23:59:59','%Y-%m-%d %H:%M:%S') page = 1 while True: params = { 'uid': uid, 'timestamp': timestamp, 'sign': sign, 'starttime': starttime, 'endtime': endtime, 'page': page } response_result_json = requests.get(url=url, params=params).json() # print(response_result_json) if 'data' not in response_result_json.keys(): print(f'掌读账号【{uid}】, 查询时间【{i[0]} - {i[1]}】,本次请求数据异常,响应报文【{response_result_json}】') break result_data = response_result_json['data'] page_count = result_data['pageCount'] if page_count == 0: break order_item_list = result_data['list'] for i in order_item_list: order_time = DateUtils.stamp_to_str(i['ctime']) reg_time = DateUtils.stamp_to_str(i['regtime']) order_list.append(( order_time[:10], stage, '掌读', channel, uid, i['openid'], order_time, reg_time, i['amount'], i['book_entry'], i['orderno'], 2 if i['status'] == '1' else 1 )) if page == page_count: # 是最后一页 break page = page + 1 print(f"{channel} [{start}]~[{end}] 有订单 {order_list.__len__()}") if len(order_list) > 0: # print(order_list) save_order(order_list) def get_zzy_order_task(start, end, account): url = 'https://inovel.818tu.com/partners/channel/channels/list?' key = account[0] secert = account[1] stage = account[2] sign = md5(secert + 'key=' + key) params = 'key=' + key + '&sign=' + sign response_result_json = requests.get(url + params).json() # 获取子渠道列表 if 'data' not in response_result_json.keys(): print('掌中云账号【{key}】本次请求数据异常,响应报文【{result}】'.format(key=key, result=response_result_json)) return items = response_result_json['data']['items'] print(f'VIP{account[0]} 有公众号{len(items)} ') total_order_list = [] for channel in items: # 获取channel_id 后逐个拉取历史orders order_list = get_zzy_channel_order(start, end, account, channel) total_order_list.extend(order_list) print(f"{stage} [{start}]~[{end}] 有订单{total_order_list.__len__()}") if len(total_order_list) > 0: save_order(total_order_list) def get_zzy_channel_order(start, end, account, channel): get_time=DateUtils.str_to_date_str(start, f2="%Y-%m-%dT%H:%M:%S+08:00") limit_time=DateUtils.str_to_date_str(end, f2="%Y-%m-%dT%H:%M:%S+08:00") order_list = [] key = account[0] secert = account[1] stage = account[2] order_url = 'https://openapi.818tu.com/partners/channel/orders/list?' channel_id = channel['id'] channel_name = channel['nickname'] status = str(1) page = str(1) per_page = str(1000) gte = parse.urlencode({'created_at[gte]': get_time}) # gte就是ge 大于等于开始时间 lt = parse.urlencode({'created_at[lt]': limit_time}) # 小于 结束时间 while True: sign = md5(secert + 'channel_id=' + str( channel_id) + '&created_at[gte]=' + get_time + '&created_at[lt]=' + limit_time + '&key=' + key + '&page=' + str( page) + '&per_page=' + per_page + '&status=' + status) params = 'channel_id=' + str(channel_id) + '&' + gte + '&' + lt + '&page=' + str( page) + '&per_page=' + per_page + '&status=' + status + '&key=' + key + '&sign=' + sign while True: r = requests.get(order_url + params) if r.status_code == 200: response_result_json = r.json() break else: time.sleep(61) print("掌中云接口调用sleep 61s...") # print(response_result_json) if 'data' not in response_result_json.keys(): print(f'掌中云账号【{key}】,查询时间【{start} - {end}】,渠道【{channel_name}】本次请求数据异常,响应报文【{r.text}】') break total_count = response_result_json['data']['count'] # 总数量 order_item_list = response_result_json['data']['items'] # 订单列表 for i in order_item_list: order_time = DateUtils.str_to_date_str(i['created_at'], "%Y-%m-%dT%H:%M:%S+08:00", "%Y-%m-%d %H:%M:%S") reg_time = DateUtils.str_to_date_str(i['member']['created_at'], "%Y-%m-%dT%H:%M:%S+08:00", "%Y-%m-%d %H:%M:%S") order_list.append(( order_time[:10], stage, '掌中云', channel_name, channel_id, i['member']['openid'], order_time, reg_time, round(i['price'] / 100, 2), i['from_novel']['title'] if str(i['from_novel_id']) != 'None' else '', str(i['id']), 2 if i['status'] == 1 else 1 )) if int(page) >= math.ceil(total_count / int(per_page)): break page = int(page) + 1 # print(f"{channel_name}获取订单:{order_list.__len__()}") # print(order_list) return order_list if __name__ == '__main__': # account = "347347942,e0c361b54a35a55c2b6296b5a80867ce,趣程小程序" # get_hs_order_task('2021-05-01','2021-05-07',account.split(",")) # print(DateUtils.stamp_to_str(1612155476,'%Y-%m-%d %H:%M:%S')[:10]) # exit(0) st= et = '2021-05-07' account = "62140324,KUUxPIokqtIrtvHQ,1025010,趣程19期,qucheng19qi@163.com" get_wd_order_task(st,et,account.split(','))