#!/usr/bin/env python # -*- coding: utf-8 -*- # 20201104 ## 20201106新添加 文鼎 平台接口 ########################## 文鼎订单接口,返回数据格式 ''' { "code": 200, "data": { "totalPage": 1, "rechargeList": [ { "userId": 1067, "nickName": "海洁测试!", "userFollowTime": 1512717067449, "wx_originalId": "gh_3f1fc031329d", "wx_mpName": "测试公众号", "wx_user_openId": "odjo61rGnt6Sgl6CeWhPZTfve7eA", "rechargeUuid": "1fd47b51-1256-43ce-a72d-3266e23235ba", "rechargeMethod": 1, "money": 100, "createTime": 1512717069449, "payStatus": 0 }, ] ''' ########################## import hashlib import time import datetime from concurrent.futures import ProcessPoolExecutor import requests from util import date_util from util import platform_config_util ## 账号配置 from util import robust_util from apscheduler.schedulers.blocking import BlockingScheduler from util.MySQLConnection import MySQLConnection #import random #import math #from urllib import parse def md5(s): md5 = hashlib.md5() md5.update(s.encode("utf-8")) return md5.hexdigest() def getSelfDateStr(times=time.time(),date_format='%Y%m%d'): """ ## 20201028添加,阳光接口,文鼎接口,日期参数请求格式20201028,一日一拉api数据 description: 获取指定时间戳 time: 秒 默认当前时间 return: 返回指定时间戳的前一日日期 。 比如 :接收20190512号的时间戳,返回 20190513 -> str tips: 一天86400秒 """ timestamps = str(time.strftime(date_format,time.localtime(times))) return timestamps def get_wending_account_list(): """ des cription: 文鼎账号列表 return: [['consumerkey', 'secretkey', 'siteid', 'stage', 'account']] ->list """ return platform_config_util.get_account_list('文鼎', 'wending_account_config.csv') ## 5线程并发 @robust_util.catch_exception def get_wending_order(st,et,account_list): total_order_list = () start_exec_seconds = date_util.getCurrentSecondTime() #account_list = get_wending_account_list() executor = ProcessPoolExecutor(max_workers=5) futures = [] for account in account_list: future = executor.submit(get_wending_order_task, st, et, account) futures.append(future) executor.shutdown(True) for future in futures: order_list = future.result() if len(order_list) > 0: total_order_list = order_list + total_order_list print('文鼎订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds) return total_order_list ## 获取文鼎账号的站点list def get_wd_account_siteid_list(account): url = 'https://bi.reading.163.com/dist-api/siteList' consumerkey = account[0] secretkey = account[1] stage = account[3] timestamp = int(time.time()*1000) judge = True while judge: siteid_params = { "consumerkey":consumerkey, 'secretkey':secretkey, 'timestamp':timestamp, } sorted_data = sorted(siteid_params.items(),reverse = False) s="" for k,v in sorted_data: s = s+str(k)+"="+str(v) sign = md5(s).lower() siteid_params['sign'] = sign consumerkey = siteid_params['consumerkey'] timestamp = siteid_params['timestamp'] #secretkey = siteid_params['secretkey'] parameter = 'consumerkey='+str(consumerkey)+'×tamp='+str(timestamp)+'&sign='+str(sign) get_url = url+"?"+parameter response_result_json = requests.get(url=get_url).json() ''' 站点json返回格式如下 {'code': 200, 'data': [ {'mpId': 1023064, 'mpName': '流云书楼', 'siteList': [{'id': 1023064, 'domain': 'lysl1.reading.163.com', 'name': '流云书楼', 'fileKey': 'Pld5F38EmbmeD1pV'}]}, {'mpId': 1025058, 'mpName': '骁骑文选', 'siteList': [{'id': 1025058, 'domain': 'xqwx.reading.163.com', 'name': '骁骑文选', 'fileKey': 'ODUXIVk3Y5vKM59d'}]} ], 'message': 'success'} ''' code = response_result_json['code'] if code != 200: print( stage,'文鼎siteid站点接口异常:',response_result_json,'传入参数', siteid_params,"请求url",get_url) break #print('返回结果:',response_result_json) id_key_list = response_result_json['data'] mpid_list = [] try: for id_key_val in id_key_list: mpid = dict(id_key_val)["mpId"] mpid_list.append(mpid) except Exception as e: print(stage,'站点查询返回结果:',response_result_json) judge = False print(stage,'文鼎siteid列表:',mpid_list) return mpid_list ## 根据url,传入params获取json对象 def get_wending_json_object(url,params): params['timestamp'] = int(time.time()*1000) sorted_data = sorted(params.items(),reverse = False) s="" for k,v in sorted_data: s = s+str(k)+"="+str(v) sign = md5(s).lower() params['sign'] = sign consumerkey = params['consumerkey'] secretkey = params['secretkey'] timestamp = params['timestamp'] siteid = params['siteid'] pageSize = params['pageSize'] starttime = params['starttime'] endtime = params['endtime'] page = params['page'] paystatus = params['paystatus'] ## +'&secretkey='+str(secretkey) parameter = 'consumerkey='+str(consumerkey)+'×tamp='+str(timestamp)+'&siteid='+str(siteid)+'&pageSize='+str(pageSize)\ +'&starttime='+str(starttime)+'&endtime='+str(endtime)+'&page='+str(page)+'&paystatus='+str(paystatus)+'&sign='+str(sign) global get_url get_url = url+"?"+parameter response_result_json = requests.get(url=get_url).json() #response_result_json = requests.get(url=url, params=params).json() del params['sign'] return response_result_json ## 具体文鼎任务 def get_wending_order_task(st,et,account): order_list = () url = 'https://bi.reading.163.com/dist-api/rechargeList' ## 接口鉴权参数 consumerkey = account[0] secretkey = account[1] ## 订单参数 siteid = account[2] pageSize = 1000 page = 1 paystatus = 1 ## 需要保存到订单的标签期数 stage = account[3] jud = True while jud: if st >= et: break starttime = getSelfDateStr(st,'%Y%m%d%H%M') endtime = getSelfDateStr(et,'%Y%m%d%H%M') siteid_list = get_wd_account_siteid_list(account) if len(siteid_list) == 0: break for siteid in siteid_list: params = { 'consumerkey': consumerkey, 'secretkey':secretkey, 'timestamp':int(1601481600), 'siteid':siteid, 'pageSize':pageSize, 'starttime':starttime, 'endtime':endtime, 'page':page, 'paystatus':paystatus } response_result_json = get_wending_json_object(url,params) code = response_result_json['code'] if code != 200: print('文鼎查询充值接口异常:',response_result_json,'传入参数', params,"请求url",get_url) break totalPag = response_result_json['data']['totalPage'] if totalPag <= 0: break for page in range(1,totalPag+1): params['page'] = page response_result_json = get_wending_json_object(url,params) order_item_list = response_result_json['data']['rechargeList'] ''' print(order_item_list) [{ 'userId': 48267585, 'userRegisterTime': 1568671618894, 'nickName': '\ue04a邓泽群\ue04a爱花园', 'ip': '124.13.64.179' , 'userAgent': 'Mozilla/5.0 (Linux; Android 9; /arm64' , 'userFollowTime': 1568671618894, 'wx_originalId': 'gh_0beeff4c0d70', 'wx_mpName': '美语阅读', 'wx_user_openId': 'oWL046E86PdO9wX34naL6IIwfaVc' , 'rechargeUuid': '38fc210f-8e7d-4ae5-a395-d33c2a80e234', 'sourceUuid': 'ts_b4651ee782e94cce8fc6def301de1367_4', 'bookTitle': '绝代医相' , 'ewTradeId': '4200000687202010012870758048', 'payTime': 1601565287000, 'rechargeMethod': 1, 'money': 2900, 'createTime': 1601565278043 , 'updateTime': 1601565288057, 'payStatus': 1 }] ''' ## 获取该页数据 for x in order_item_list: y={} y['date'] = (int(x['payTime']//1000)+ 8 * 3600) // 86400 * 86400 - 8 * 3600 ## 网易的是13位时间戳 y['platform'] = '文鼎' y['channel'] = x['wx_mpName'] ## 公众号名称 y['channel_id'] = x['wx_originalId'] ## 公众号id y['from_novel'] = x['bookTitle'] ## 小说名称 y['user_id'] = x['userId'] ## 付费用户uid y['stage'] = stage ## 期数 createTime = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(x['createTime']//1000)) ## 时间戳 》struct_time 》标准时间 y['order_time']= createTime ## 订单生成时间 y['amount']=x['money']/100 ## 原数据单位:分 uid_reg_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(x['userRegisterTime']//1000)) ## 13位时间戳 》标准时间 y['reg_time']= uid_reg_time ## 用户注册时间 y['order_id']= x['ewTradeId'] ## 订单id y = sorted(y.items(), key=lambda item:item[0]) y = dict(y) y = tuple(y.values()) order_list = order_list+((y),) if totalPag==params['page']: break jud = False print(f"文鼎数据日期-{starttime}到{endtime}-期数-{stage}-获取数据-{len(order_list)}条,例如》{order_list[0:1]}") return order_list def batch_save_order(data): if data is None or len(data) == 0: print('数据为空,不执行数据库操作!') else: sql = 'INSERT IGNORE INTO quchen_text.order(amount,channel,channel_id,date,from_novel,order_id,order_time,platform,reg_time,stage,user_id) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);' connect = MySQLConnection() try: num = connect.batch(sql, data) # 提交 connect.commit() print('订单数据最终入库【{num}】条'.format(num=num)) except Exception as e: print('订单数据入库失败:', e) finally: connect.close() def start_order_job(): start_exec_seconds = date_util.getCurrentSecondTime() #st_unix, et_unix = date_util.getPreviousHourAndCurrentHourSecondTime(start_exec_seconds) # st_unix = 1602313200 # 2020/10/10 15:0:0 # et_unix = 1602316800 # 2020/10/10 16:0:0 #print('查询开始时间:', st_unix, date_util.getSecondsToDatetime(st_unix)) #print('查询结束时间:', et_unix, date_util.getSecondsToDatetime(et_unix)) ## 20201105添加文鼎平台 st_unix = date_util.getYesterdayStartTime() et_unix = date_util.getTodayStartTime() account_list = get_wending_account_list() ''' ## 2个账户测试看看 account_list = [ ['68442881','RFygHhX16LEYYe8i','1014108','趣程20期','qucheng20qi@163.com'], ['77257999','86nPtJdYLe1k81gE','1021116','趣程21期','qucheng21qi@163.com'] ] ''' batch_save_order(get_wending_order(st_unix, et_unix,account_list)) print('订单同步执行时间(秒):', date_util.getCurrentSecondTime() - start_exec_seconds) start_job_time = '2020-11-11 02:00:00' if __name__ == '__main__': scheduler = BlockingScheduler() scheduler.add_job(start_order_job, 'interval', days =1 ,start_date=start_job_time) #scheduler.add_job(start_order_job, 'interval',days =1,hours = 2,minutes = 0,seconds = 0) #线上是24h执行一次 scheduler.start() ''' if __name__ == '__main__': start_order_job() account_list = [ ['11790115','VjVIGRX5YgJCGQjC','1023064','趣程15期','qucheng15qi@163.com'], ['50465587','E5D1qzGtmhbx5EAS','1025020','趣程24期','qucheng24qi@163.com'], ['77257999','86nPtJdYLe1k81gE','1021116','趣程21期','qucheng21qi@163.com'] ] '''