import hashlib import time import datetime from concurrent.futures import ProcessPoolExecutor import requests from util import date_util from util import platform_config_util from util import robust_util from util.MySQLConnection import MySQLConnection def md5(s): md5 = hashlib.md5() md5.update(s.encode("utf-8")) return md5.hexdigest() def getSelfDateStr(times=time.time(), date_format='%Y%m%d'): """ ## 20201028添加,阳光接口,文鼎接口,日期参数请求格式20201028,一日一拉api数据 description: 获取指定时间戳 time: 秒 默认当前时间 return: 返回指定时间戳的前一日日期 。 比如 :接收20190512号的时间戳,返回 20190513 -> str tips: 一天86400秒 """ timestamps = str(time.strftime(date_format, time.localtime(times))) return timestamps def get_wending_account_list(): sql = "select text from order_account_text where platform='文鼎'" con = MySQLConnection() data = con.query(sql) li = [] for i in data: a = i['text'].replace('\n', '').split(',') li.append(a) return li def get_wending_order(st, et, account_list): total_order_list = () start_exec_seconds = date_util.getCurrentSecondTime() futures = [] for account in account_list: futures.append(get_wending_order_task(st, et, account)) for future in futures: if len(future) > 0: total_order_list = future + total_order_list print('文鼎订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds) return total_order_list def get_wd_account_siteid_list(account): url = 'https://bi.reading.163.com/dist-api/siteList' consumerkey = account[0] secretkey = account[1] stage = account[3] timestamp = int(time.time() * 1000) siteid_params = { "consumerkey": consumerkey, 'secretkey': secretkey, 'timestamp': timestamp, } sorted_data = sorted(siteid_params.items(), reverse=False) s = "" for k, v in sorted_data: s = s + str(k) + "=" + str(v) sign = md5(s).lower() siteid_params['sign'] = sign consumerkey = siteid_params['consumerkey'] timestamp = siteid_params['timestamp'] parameter = 'consumerkey=' + str(consumerkey) + '×tamp=' + str(timestamp) + '&sign=' + str(sign) get_url = url + "?" + parameter while True: r = requests.get(url=get_url) if r.status_code == 200: break try: id_key_list = r.json()['data'] except: return [] mpid_list = [] try: for id_key_val in id_key_list: mpid = dict(id_key_val)["mpId"] mpid_list.append(mpid) except Exception as e: print(stage, '站点查询返回结果:', r.json()) return mpid_list def get_wending_json_object(url, params): params['timestamp'] = int(time.time() * 1000) sorted_data = sorted(params.items(), reverse=False) s = "" for k, v in sorted_data: s = s + str(k) + "=" + str(v) sign = md5(s).lower() params['sign'] = sign consumerkey = params['consumerkey'] secretkey = params['secretkey'] timestamp = params['timestamp'] siteid = params['siteid'] pageSize = params['pageSize'] starttime = params['starttime'] endtime = params['endtime'] page = params['page'] paystatus = params['paystatus'] ## +'&secretkey='+str(secretkey) parameter = 'consumerkey=' + str(consumerkey) + '×tamp=' + str(timestamp) + '&siteid=' + str( siteid) + '&pageSize=' + str(pageSize) \ + '&starttime=' + str(starttime) + '&endtime=' + str(endtime) + '&page=' + str( page) + '&paystatus=' + str(paystatus) + '&sign=' + str(sign) global get_url get_url = url + "?" + parameter while True: r = requests.get(url=get_url) if r.status_code == 200: break else: time.sleep(1) print("请求连接出错,等待1s...") response_result_json = r.json() del params['sign'] return response_result_json def get_wending_order_task(st, et, account): order_list = () url = 'https://bi.reading.163.com/dist-api/rechargeList' consumerkey = account[0] secretkey = account[1] siteid = account[2] stage = account[3] siteid_list = get_wd_account_siteid_list(account) print(siteid_list) if len(siteid_list) == 0: siteid_list.append(siteid) starttime = getSelfDateStr(st, '%Y%m%d%H%M') endtime = getSelfDateStr(et, '%Y%m%d%H%M') for siteid in siteid_list: page = 1 while True: params = { 'consumerkey': consumerkey, 'secretkey': secretkey, 'timestamp': int(1601481600), 'siteid': siteid, 'pageSize': 1000, 'starttime': starttime, 'endtime': endtime, 'page': page, 'paystatus': 1} response_result_json = get_wending_json_object(url, params) order_item_list = response_result_json['data']['rechargeList'] for x in order_item_list: y = {} y['platform'] = '文鼎' y['channel'] = x['wx_mpName'] ## 公众号名称 y['channel_id'] = x['wx_originalId'] ## 公众号id y['from_novel'] = x['bookTitle'] ## 小说名称 y['user_id'] = x['userId'] ## 付费用户uid y['stage'] = stage ## 期数 createTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(x['createTime'] // 1000)) ## 时间戳 》struct_time 》标准时间 y['order_time'] = createTime ## 订单生成时间 y['amount'] = x['money'] / 100 ## 原数据单位:分 uid_reg_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(x['userRegisterTime'] // 1000)) ## 13位时间戳 》标准时间 y['reg_time'] = uid_reg_time ## 用户注册时间 y['order_id'] = x['ewTradeId'] ## 订单id y['date'] = createTime y = sorted(y.items(), key=lambda item: item[0]) y = dict(y) y = tuple(y.values()) order_list = order_list + ((y),) if len(order_item_list) < 1000: break else: page += 1 print(f"文鼎数据日期-{starttime}到{endtime}-期数-{stage}-获取数据-{len(order_list)}条") return order_list def batch_save_order_new(data): if data is None or len(data) == 0: print('数据为空,不执行数据库操作!') else: sql = 'INSERT IGNORE INTO quchen_text.ods_order(amount,channel,channel_id,date,from_novel,order_id,order_time,platform,reg_time,stage,user_id) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);' connect = MySQLConnection() try: num = connect.batch(sql, data) # 提交 connect.commit() print('订单数据最终入库【{num}】条'.format(num=num)) except Exception as e: print('订单数据入库失败:', e) finally: connect.close() def start_order_job_wending(st, et): account_list = get_wending_account_list() da = get_wending_order(st, et, account_list) batch_save_order_new(da) if __name__ == '__main__': start_order_job_wending()