import time from model import ComUtils import requests import json from model.DataBaseUtils import MysqlUtils from model.DateUtils import DateUtils from model.ComUtils import * import math from model.DateUtils import DateUtils import logging from urllib import parse logging.getLogger().setLevel(logging.WARNING) db=MysqlUtils() du=DateUtils() def get_yg_vip_channel(stage, vip_id, client_id, token): url = "https://data.yifengaf.cn:443/channeldata/data/account/list" nonce = ComUtils.get_random_str() timestamp = int(time.time()) signaure = ComUtils.sha1(str(token) + str(timestamp) + str(client_id) + str(nonce)) params = { "client_id": client_id, "token": token, "nonce": nonce, "timestamp": timestamp, "signaure": signaure, "vip_id": vip_id, } headers = {"Content-Type": "application/json"} r = requests.post(url=url, data=json.dumps(params), headers=headers) print(r.text) task_id = json.loads(r.text).get("data").get("task_id") db.quchen_text.execute( f"replace into yangguang_path(vip_id,task_id,stage,type) values ('{vip_id}','{task_id}','{stage}','channel')") def get_yg_data(stage,vip_id,client_id,token,start,end): url = "https://data.yifengaf.cn:443/channeldata/data/orders/list" nonce=ComUtils.get_random_str() timestamp=int(time.time()) signaure =ComUtils.sha1(str(token) + str(timestamp) + str(client_id) + str(nonce)) params = { "client_id": client_id, "token": token, "nonce": nonce, "timestamp": timestamp, "signaure": signaure, "vip_id": vip_id, "start_time":start, # %Y-%m-%d %H:%i:%s: "end_time":end } headers={"Content-Type":"application/json"} r=requests.post(url=url,data=json.dumps(params),headers=headers) print(r.text) task_id = json.loads(r.text).get("data").get("task_id") db.quchen_text.execute(f"replace into yangguang_path(vip_id,task_id,stage,type) values ('{vip_id}','{task_id}','{stage}','order')") def parse_yg_data(vip_id,stage): url = db.quchen_text.getOne(f"select path from yangguang_path where type='channel' and vip_id={vip_id} ") r = requests.get(url).text channel_di={} a = r.split('}') for i in a[:-1]: if i[-1] != '}': b=json.loads(i + "}", strict=False) else: b=json.loads(i, strict=False) channel_di[b["channel_id"]]=b["wx_nickname"] # print(channel_di) print(f'{stage} 有channel数:{len(channel_di)}') info=db.quchen_text.getData(f"select stage,path from yangguang_path where type='order' and vip_id={vip_id}") stage=info[0][0] path=info[0][1] text=requests.get(path).text.replace('"referral_url":,','') insert_data=[] for j in text.split("}")[:-1]: if j[-1] != '}': j=j+'}' try: di=json.loads(j, strict=False) except Exception as e: print(j) print(e) if di["state"] == "未完成": continue platform = "阳光" channel_id = di["channel_id"] channel=channel_di[channel_id] user_id = di["openid"] order_time = di["create_time"] reg_time = di["user_createtime"] from_novel = di["book_name"] amount = di["money"] order_id = di["transaction_id"] date = DateUtils.str_to_stamp(order_time[:10]) insert_data.append((date,stage,platform,channel,channel_id,user_id,order_time,reg_time,amount,from_novel,order_id)) # print(insert_data) print("订单数:"+ str(insert_data.__len__())) db.quchen_text.executeMany("replace into `order`(date,stage,platform,channel,channel_id," "user_id,order_time,reg_time,amount,from_novel,order_id) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",tuple(insert_data)) def get_hs_channel(account): url = 'https://vip.rlcps.cn/api/getMerchants' apiKey = str(account[0]) apiSecurity = account[1] timestamp = str(int(time.time())) sign = md5(apiKey + timestamp + apiSecurity).upper() params = { 'apiKey': apiKey, 'apiSecurity': apiSecurity, 'timestamp': timestamp, 'sign': sign } response_result_json = requests.post(url, params).json() if 'data' not in response_result_json.keys(): print('花生账号【{apiKey}】本次请求数据异常,响应报文【{result}】'.format(apiKey=apiKey, result=response_result_json)) return return response_result_json['data'] def get_huasheng_order_task(start,end, account, merchant,li): count=0 apiKey = str(account[0]) apiSecurity = account[1] stage = account[2] timestamp = str(int(time.time())) order_url = 'https://vip.rlcps.cn/api/orderList' merchant_id = merchant['merchant_id'] merchant_name = merchant['merchant_name'] limit = 500 for date in du.getDateLists(start,end): page = 1 while True: sign = md5(apiKey + date + str(merchant_id) + timestamp + apiSecurity).upper() order_params = { 'apiKey': apiKey, 'apiSecurity': apiSecurity, 'timestamp': timestamp, 'date': date, 'merchant_id': merchant_id, 'sign': sign, 'page': page, 'limit': limit } r = requests.post(order_url, order_params) # print(r.text) response_result_json = r.json() if 'data' not in response_result_json.keys(): print('花生账号【{key}】, 查询时间【{date}】, 渠道【{merchant_id}:{merchant_name}】本次请求数据异常,响应报文【{result}】' .format(key=apiKey, date=date, merchant_id=merchant_id, merchant_name=merchant_name, result=response_result_json)) break if len(response_result_json['data']) == 0: break total_count = response_result_json['count'] order_item_list = response_result_json['data'] for order_item in order_item_list: if order_item['order_status'] == 1: # 1为已支付 order = {} order['user_id'] = order_item['openid'] order['order_id'] = order_item['trans_id'] order['order_time'] = order_item['pay_at'] order['reg_time'] = order_item['join_at'] order['channel'] = merchant_name order['channel_id'] = merchant_id order['platform'] = '花生' order['stage'] = stage order['from_novel'] = order_item['book_name'] order['amount'] = order_item['amount'] order["date"]=int(time.mktime(time.strptime(order_item['pay_at'][:10],"%Y-%m-%d"))) order = sorted(order.items(), key=lambda item: item[0]) order = dict(order) order = tuple(order.values()) if order.__len__()>0: li.append(order) count+=1 if int(page) >= math.ceil(total_count / int(limit)): break page = page + 1 print(f"[{merchant_name}] 订单数: {count}") def save_hs_data(data): sql = 'replace INTO quchen_text.`order` ' \ '(amount,channel,channel_id,date,from_novel,order_id,order_time,platform,reg_time,stage,user_id)' \ ' VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);' db.quchen_text.executeMany(sql,data) def get_qiyue_order_task(start, end, account): """一分钟请求60次的限制""" order_list = [] # 参数 order_url = "https://api.zhangwenwenhua.com" + "/v1/orders" stage = account[0] token = account[1] page = 1 size = 50 freq=0 for date in du.getDateLists(start, end): while True: timestamp = int(time.time()) url = order_url + "?" + "token=" + str(token) + "×tamp=" + str(timestamp) + "&page=" + str( page) + "&size=" + str(size) + "&date=" + date response_result_json = requests.get(url=url).json() freq += 1 if freq == 59: print("一分钟请求60次的限制 等待中") time.sleep(61) freq = 0 code = response_result_json['code'] if code != 0: print(stage, '七悦充值接口异常:', response_result_json) break result_data = response_result_json['data'] total = result_data['total'] if total <= 0: break order_item_list = result_data['data'] for x in order_item_list: if int(x['state']) != 2: continue y = ((int(x['create_time']) + 8 * 3600) // 86400 * 86400 - 8 * 3600, stage, '七悦', x['wechat_app_name'], # 公众号名称 x['channel_id'], x['user_id'], time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(x['create_time'])), time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(x['user_create_time'])), # 用户注册时间 x['money'], x['book_name'], x['id'] # 订单id ) order_list.append(y) next_page_url = result_data['next_page_url'] if next_page_url is None: break page += 1 # print(len(order_list)) print(f'{stage} [{start}~{end}] 有订单{order_list.__len__()}') if order_list.__len__()>0: save_order(order_list) return order_list def save_order(order_list): db.quchen_text.executeMany('replace into `order` values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)',order_list) print("入库成功") def get_wd_account_siteid_list(account): url = 'https://bi.reading.163.com/dist-api/siteList' consumerkey = account[0] secretkey = account[1] stage = account[3] timestamp = int(time.time() * 1000) siteid_params = { "consumerkey": consumerkey, 'secretkey': secretkey, 'timestamp': timestamp, } sorted_data = sorted(siteid_params.items(), reverse=False) s = "" for k, v in sorted_data: s = s + str(k) + "=" + str(v) sign = md5(s).lower() siteid_params['sign'] = sign consumerkey = siteid_params['consumerkey'] timestamp = siteid_params['timestamp'] parameter = 'consumerkey=' + str(consumerkey) + '×tamp=' + str(timestamp) + '&sign=' + str(sign) get_url = url + "?" + parameter while True: r = requests.get(url=get_url) if r.status_code == 200: break try: id_key_list = r.json()['data'] except: return [] mpid_list = [] try: for id_key_val in id_key_list: mpid = dict(id_key_val)["mpId"] mpid_list.append(mpid) except Exception as e: print(stage, '站点查询返回结果:', r.json()) return mpid_list def get_wending_json_object(url,params): params['timestamp'] = int(time.time()*1000) sorted_data = sorted(params.items(),reverse = False) s="" for k,v in sorted_data: s = s+str(k)+"="+str(v) sign = md5(s).lower() params['sign'] = sign consumerkey = params['consumerkey'] secretkey = params['secretkey'] timestamp = params['timestamp'] siteid = params['siteid'] pageSize = params['pageSize'] starttime = params['starttime'] endtime = params['endtime'] page = params['page'] paystatus = params['paystatus'] ## +'&secretkey='+str(secretkey) parameter = 'consumerkey='+str(consumerkey)+'×tamp='+str(timestamp)+'&siteid='+str(siteid)+'&pageSize='+str(pageSize)\ +'&starttime='+str(starttime)+'&endtime='+str(endtime)+'&page='+str(page)+'&paystatus='+str(paystatus)+'&sign='+str(sign) global get_url get_url = url+"?"+parameter while True: r= requests.get(url=get_url) if r.status_code==200: break else: time.sleep(1) print("请求连接出错,等待1s...") response_result_json=r.json() del params['sign'] return response_result_json def get_wd_order_task(start,end,account): order_list = [] url = 'https://bi.reading.163.com/dist-api/rechargeList' consumerkey = account[0] secretkey = account[1] siteid = account[2] stage = account[3] siteid_list = get_wd_account_siteid_list(account) # print(siteid_list) if len(siteid_list) == 0: siteid_list.append(siteid) starttime = du.date_str_to_str(start)+'0000' endtime = du.date_str_to_str(end)+'2359' for siteid in siteid_list: page = 1 while True: params = { 'consumerkey': consumerkey, 'secretkey': secretkey, 'timestamp': int(1601481600), 'siteid': siteid, 'pageSize': 1000, 'starttime': starttime, 'endtime': endtime, 'page': page, 'paystatus': 1} response_result_json = get_wending_json_object(url, params) order_item_list = response_result_json['data']['rechargeList'] for x in order_item_list: createTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(x['createTime'] // 1000)) ## 时间戳 》struct_time 》标准时间 uid_reg_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(x['userRegisterTime'] // 1000)) ## 13位时间戳 》标准时间 y = (int(time.mktime(time.strptime(createTime[:10], '%Y-%m-%d'))), stage, '文鼎', x['wx_mpName'], x['wx_originalId'], x['userId'], createTime, uid_reg_time, x['money'] / 100, x['bookTitle'], x['ewTradeId']) order_list.append(y) if len(order_item_list) < 1000: break else: page += 1 print(f"{stage} [{start}~{end}] 有订单 {order_list.__len__()}") if order_list.__len__()>0: save_order(order_list) def get_zd_order_task(start,end,account): """开始到结束最多90天""" order_list = [] url = 'https://api.zhangdu520.com/channel/getorder' uid = account[0] appsecert = account[1] channel = account[2] stage = account[3] timestamp = int(time.time()) sign = md5(str(uid) + '&' + appsecert + '&' + str(timestamp)) for i in du.split_date2(start, end, 90): starttime = DateUtils.str_to_stamp(i[0]+' 00:00:00','%Y-%m-%d %H:%M:%S') endtime = DateUtils.str_to_stamp(i[1]+' 23:59:59','%Y-%m-%d %H:%M:%S') page = 1 while True: params = { 'uid': uid, 'timestamp': timestamp, 'sign': sign, 'starttime': starttime, 'endtime': endtime, 'page': page } response_result_json = requests.get(url=url, params=params).json() if 'data' not in response_result_json.keys(): print(f'掌读账号【{uid}】, 查询时间【{i[0]} - {i[1]}】,本次请求数据异常,响应报文【{response_result_json}】') break result_data = response_result_json['data'] page_count = result_data['pageCount'] if page_count == 0: break order_item_list = result_data['list'] for order_item in order_item_list: if order_item['status'] != '1': # 1为已支付 continue order_time = order_item['ctime'] order = ( str(DateUtils.stamp_to_date_stamp(int(order_time))), stage, '掌读', channel, uid, order_item['openid'], order_time, order_item['regtime'], order_item['amount'], order_item.get('book_entry', ''), str(order_item['orderno']) ) order_list.append(order) if page == page_count: # 是最后一页 break page = page + 1 print(f"{channel} [{start}]~[{end}] 有订单 {order_list.__len__()}") if len(order_list) > 0: save_order(order_list) def get_zzy_order_task(start, end, account): url = 'https://inovel.818tu.com/partners/channel/channels/list?' key = account[0] secert = account[1] stage = account[2] sign = md5(secert + 'key=' + key) params = 'key=' + key + '&sign=' + sign response_result_json = requests.get(url + params).json() # 获取子渠道列表 if 'data' not in response_result_json.keys(): print('掌中云账号【{key}】本次请求数据异常,响应报文【{result}】'.format(key=key, result=response_result_json)) return items = response_result_json['data']['items'] print(f'VIP{account[0]} 有公众号{len(items)} ') total_order_list = [] for channel in items: # 获取channel_id 后逐个拉取历史orders order_list = get_zzy_channel_order(start, end, account, channel) total_order_list.extend(order_list) print(f"{stage} [{start}]~[{end}] 有订单{total_order_list.__len__()}") if total_order_list > 0: save_order(total_order_list) def get_zzy_channel_order(start, end, account, channel): get_time=DateUtils.str_to_date_str(start, f2="%Y-%m-%dT%H:%M:%S+08:00") limit_time=DateUtils.str_to_date_str(end, f2="%Y-%m-%dT%H:%M:%S+08:00") order_list = [] key = account[0] secert = account[1] stage = account[2] order_url = 'https://openapi.818tu.com/partners/channel/orders/list?' channel_id = channel['id'] channel_name = channel['nickname'] status = str(1) page = str(1) per_page = str(1000) gte = parse.urlencode({'created_at[gte]': get_time}) # gte就是ge 大于等于开始时间 lt = parse.urlencode({'created_at[lt]': limit_time}) # 小于 结束时间 while True: sign = md5(secert + 'channel_id=' + str( channel_id) + '&created_at[gte]=' + get_time + '&created_at[lt]=' + limit_time + '&key=' + key + '&page=' + str( page) + '&per_page=' + per_page + '&status=' + status) params = 'channel_id=' + str(channel_id) + '&' + gte + '&' + lt + '&page=' + str( page) + '&per_page=' + per_page + '&status=' + status + '&key=' + key + '&sign=' + sign while True: r = requests.get(order_url + params) if r.status_code == 200: response_result_json = r.json() break else: time.sleep(61) print("掌中云接口调用sleep 61s...") if 'data' not in response_result_json.keys(): print(f'掌中云账号【{key}】,查询时间【{start} - {end}】,渠道【{channel_name}】本次请求数据异常,响应报文【{r.text}】') break total_count = response_result_json['data']['count'] # 总数量 order_item_list = response_result_json['data']['items'] # 订单列表 for order_item in order_item_list: order_time = DateUtils.str_to_date_str(order_item['created_at'], "%Y-%m-%dT%H:%M:%S+08:00", "%Y-%m-%d %H:%M:%S") order = ( DateUtils.str_to_stamp(order_time[:10]), stage, '掌中云', channel_name, channel_id, str(order_item['member']['openid']), order_item['created_at'], order_item['member']['created_at'], round(order_item['price'] / 100, 2), order_item['from_novel']['title'] if str(order_item['from_novel_id']) != 'None' else '', str(order_item['id']) ) order_list.append(order) if int(page) >= math.ceil(total_count / int(per_page)): break page = int(page) + 1 # print(f"{channel_name}获取订单:{order_list.__len__()}") return order_list def get_yueweng_order_task(start, end, account): pass if __name__ == '__main__': get_qiyue_order_task('2021-02-01','2021-02-19',['趣程15期','eyJpdiI6ImluVWxoRUl3VTR6QU5hamlYOFBvXC9BPT0iLCJ2YWx1ZSI6Ik5IZ0N4dm5GcmJ0Zklsd0tNZ1JVSVE9PSIsIm1hYyI6IjJjODUzMjdlZTc2ODI2ZjFmY2QyYmU5MGViYTkzOGU4MDEwZTIyODIxOTE4NzgzYTNhOGQ1YWM4OGJkMDAzMmIifQ=='])