import hashlib import time import datetime from concurrent.futures import ProcessPoolExecutor import requests from util import date_util from util import platform_config_util from util import robust_util from util.MySQLConnection import MySQLConnection def md5(s): md5 = hashlib.md5() md5.update(s.encode("utf-8")) return md5.hexdigest() def getSelfDateStr(times=time.time(),date_format='%Y%m%d'): """ ## 20201028添加,阳光接口,文鼎接口,日期参数请求格式20201028,一日一拉api数据 description: 获取指定时间戳 time: 秒 默认当前时间 return: 返回指定时间戳的前一日日期 。 比如 :接收20190512号的时间戳,返回 20190513 -> str tips: 一天86400秒 """ timestamps = str(time.strftime(date_format,time.localtime(times))) return timestamps def get_wending_account_list(): sql="select text from order_account_text where platform='文鼎'" con=MySQLConnection() data= con.query(sql) li=[] for i in data: a= i['text'].replace('\n','').split(',') li.append(a) return li def get_wending_order(st,et,account_list): total_order_list = () start_exec_seconds = date_util.getCurrentSecondTime() futures = [] for account in account_list: futures.append(get_wending_order_task(st, et, account)) for future in futures: if len(future)>0: total_order_list = future + total_order_list print('文鼎订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds) return total_order_list def get_wd_account_siteid_list(account): url = 'https://bi.reading.163.com/dist-api/siteList' consumerkey = account[0] secretkey = account[1] stage = account[3] timestamp = int(time.time()*1000) siteid_params = { "consumerkey":consumerkey, 'secretkey':secretkey, 'timestamp':timestamp, } sorted_data = sorted(siteid_params.items(),reverse = False) s="" for k,v in sorted_data: s = s+str(k)+"="+str(v) sign = md5(s).lower() siteid_params['sign'] = sign consumerkey = siteid_params['consumerkey'] timestamp = siteid_params['timestamp'] parameter = 'consumerkey='+str(consumerkey)+'×tamp='+str(timestamp)+'&sign='+str(sign) get_url = url+"?"+parameter while True: r = requests.get(url=get_url) if r.status_code==200: break try: id_key_list = r.json()['data'] except: return [] mpid_list = [] try: for id_key_val in id_key_list: mpid = dict(id_key_val)["mpId"] mpid_list.append(mpid) except Exception as e: print(stage,'站点查询返回结果:',r.json()) return mpid_list def get_wending_json_object(url,params): params['timestamp'] = int(time.time()*1000) sorted_data = sorted(params.items(),reverse = False) s="" for k,v in sorted_data: s = s+str(k)+"="+str(v) sign = md5(s).lower() params['sign'] = sign consumerkey = params['consumerkey'] secretkey = params['secretkey'] timestamp = params['timestamp'] siteid = params['siteid'] pageSize = params['pageSize'] starttime = params['starttime'] endtime = params['endtime'] page = params['page'] paystatus = params['paystatus'] ## +'&secretkey='+str(secretkey) parameter = 'consumerkey='+str(consumerkey)+'×tamp='+str(timestamp)+'&siteid='+str(siteid)+'&pageSize='+str(pageSize)\ +'&starttime='+str(starttime)+'&endtime='+str(endtime)+'&page='+str(page)+'&paystatus='+str(paystatus)+'&sign='+str(sign) global get_url get_url = url+"?"+parameter while True: r= requests.get(url=get_url) if r.status_code==200: break else: time.sleep(1) print("请求连接出错,等待1s...") response_result_json=r.json() del params['sign'] return response_result_json def get_wending_order_task(st,et,account): order_list = () url = 'https://bi.reading.163.com/dist-api/rechargeList' consumerkey = account[0] secretkey = account[1] siteid=account[2] stage = account[3] siteid_list = get_wd_account_siteid_list(account) print(siteid_list) if len(siteid_list) == 0: siteid_list.append(siteid) starttime = getSelfDateStr(st,'%Y%m%d%H%M') endtime = getSelfDateStr(et,'%Y%m%d%H%M') for siteid in siteid_list: page = 1 while True: params = { 'consumerkey': consumerkey, 'secretkey':secretkey, 'timestamp':int(1601481600), 'siteid':siteid, 'pageSize':1000, 'starttime':starttime, 'endtime':endtime, 'page':page, 'paystatus':1} response_result_json = get_wending_json_object(url,params) order_item_list = response_result_json['data']['rechargeList'] for x in order_item_list: y={} y['platform'] = '文鼎' y['channel'] = x['wx_mpName'] ## 公众号名称 y['channel_id'] = x['wx_originalId'] ## 公众号id y['from_novel'] = x['bookTitle'] ## 小说名称 y['user_id'] = x['userId'] ## 付费用户uid y['stage'] = stage ## 期数 createTime = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(x['createTime']//1000)) ## 时间戳 》struct_time 》标准时间 y['order_time']= createTime ## 订单生成时间 y['amount']=x['money']/100 ## 原数据单位:分 uid_reg_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(x['userRegisterTime']//1000)) ## 13位时间戳 》标准时间 y['reg_time']= uid_reg_time ## 用户注册时间 y['order_id']= x['ewTradeId'] ## 订单id y['date']= int(time.mktime(time.strptime(createTime[:10],'%Y-%m-%d'))) y = sorted(y.items(), key=lambda item:item[0]) y = dict(y) y = tuple(y.values()) order_list = order_list+((y),) if len(order_item_list)<1000: break else: page+=1 print(f"文鼎数据日期-{starttime}到{endtime}-期数-{stage}-获取数据-{len(order_list)}条") return order_list def batch_save_order(data): if data is None or len(data) == 0: print('数据为空,不执行数据库操作!') else: sql = 'INSERT IGNORE INTO quchen_text.order(amount,channel,channel_id,date,from_novel,order_id,order_time,platform,reg_time,stage,user_id) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);' connect = MySQLConnection() try: num = connect.batch(sql, data) # 提交 connect.commit() print('订单数据最终入库【{num}】条'.format(num=num)) except Exception as e: print('订单数据入库失败:', e) finally: connect.close() def start_order_job_wending(st,et): account_list = get_wending_account_list() da=get_wending_order(st, et,account_list) batch_save_order(da) if __name__ == '__main__': st = date_util.get_n_day(n=-1, is_timestamp=1) et = date_util.get_n_day(n=0, is_timestamp=1) start_order_job_wending(st,et)