123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- import hashlib
- import time
- import datetime
- from concurrent.futures import ProcessPoolExecutor
- import requests
- from util import date_util
- from util import platform_config_util
- from util import robust_util
- from util.MySQLConnection import MySQLConnection
- def md5(s):
- md5 = hashlib.md5()
- md5.update(s.encode("utf-8"))
- return md5.hexdigest()
- def getSelfDateStr(times=time.time(), date_format='%Y%m%d'):
- """
- ## 20201028添加,阳光接口,文鼎接口,日期参数请求格式20201028,一日一拉api数据
- description: 获取指定时间戳
- time: 秒 默认当前时间
- return: 返回指定时间戳的前一日日期 。 比如 :接收20190512号的时间戳,返回 20190513 -> str
- tips: 一天86400秒
- """
- timestamps = str(time.strftime(date_format, time.localtime(times)))
- return timestamps
- def get_wending_account_list():
- sql = "select text from order_account_text where platform='文鼎'"
- con = MySQLConnection()
- data = con.query(sql)
- li = []
- for i in data:
- a = i['text'].replace('\n', '').split(',')
- li.append(a)
- return li
- def get_wending_order(st, et, account_list):
- total_order_list = ()
- start_exec_seconds = date_util.getCurrentSecondTime()
- futures = []
- for account in account_list:
- futures.append(get_wending_order_task(st, et, account))
- for future in futures:
- if len(future) > 0:
- total_order_list = future + total_order_list
- print('文鼎订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
- return total_order_list
- def get_wd_account_siteid_list(account):
- url = 'https://bi.reading.163.com/dist-api/siteList'
- consumerkey = account[0]
- secretkey = account[1]
- stage = account[3]
- timestamp = int(time.time() * 1000)
- siteid_params = {
- "consumerkey": consumerkey,
- 'secretkey': secretkey,
- 'timestamp': timestamp,
- }
- sorted_data = sorted(siteid_params.items(), reverse=False)
- s = ""
- for k, v in sorted_data:
- s = s + str(k) + "=" + str(v)
- sign = md5(s).lower()
- siteid_params['sign'] = sign
- consumerkey = siteid_params['consumerkey']
- timestamp = siteid_params['timestamp']
- parameter = 'consumerkey=' + str(consumerkey) + '×tamp=' + str(timestamp) + '&sign=' + str(sign)
- get_url = url + "?" + parameter
- while True:
- r = requests.get(url=get_url)
- if r.status_code == 200:
- break
- try:
- id_key_list = r.json()['data']
- except:
- return []
- mpid_list = []
- try:
- for id_key_val in id_key_list:
- mpid = dict(id_key_val)["mpId"]
- mpid_list.append(mpid)
- except Exception as e:
- print(stage, '站点查询返回结果:', r.json())
- return mpid_list
- def get_wending_json_object(url, params):
- params['timestamp'] = int(time.time() * 1000)
- sorted_data = sorted(params.items(), reverse=False)
- s = ""
- for k, v in sorted_data:
- s = s + str(k) + "=" + str(v)
- sign = md5(s).lower()
- params['sign'] = sign
- consumerkey = params['consumerkey']
- secretkey = params['secretkey']
- timestamp = params['timestamp']
- siteid = params['siteid']
- pageSize = params['pageSize']
- starttime = params['starttime']
- endtime = params['endtime']
- page = params['page']
- paystatus = params['paystatus']
- ## +'&secretkey='+str(secretkey)
- parameter = 'consumerkey=' + str(consumerkey) + '×tamp=' + str(timestamp) + '&siteid=' + str(
- siteid) + '&pageSize=' + str(pageSize) \
- + '&starttime=' + str(starttime) + '&endtime=' + str(endtime) + '&page=' + str(
- page) + '&paystatus=' + str(paystatus) + '&sign=' + str(sign)
- global get_url
- get_url = url + "?" + parameter
- while True:
- r = requests.get(url=get_url)
- if r.status_code == 200:
- break
- else:
- time.sleep(1)
- print("请求连接出错,等待1s...")
- response_result_json = r.json()
- del params['sign']
- return response_result_json
- def get_wending_order_task(st, et, account):
- order_list = ()
- url = 'https://bi.reading.163.com/dist-api/rechargeList'
- consumerkey = account[0]
- secretkey = account[1]
- siteid = account[2]
- stage = account[3]
- siteid_list = get_wd_account_siteid_list(account)
- print(siteid_list)
- if len(siteid_list) == 0:
- siteid_list.append(siteid)
- starttime = getSelfDateStr(st, '%Y%m%d%H%M')
- endtime = getSelfDateStr(et, '%Y%m%d%H%M')
- for siteid in siteid_list:
- page = 1
- while True:
- params = {
- 'consumerkey': consumerkey,
- 'secretkey': secretkey,
- 'timestamp': int(1601481600),
- 'siteid': siteid,
- 'pageSize': 1000,
- 'starttime': starttime,
- 'endtime': endtime,
- 'page': page,
- 'paystatus': 1}
- response_result_json = get_wending_json_object(url, params)
- order_item_list = response_result_json['data']['rechargeList']
- for x in order_item_list:
- y = {}
- y['platform'] = '文鼎'
- y['channel'] = x['wx_mpName'] ## 公众号名称
- y['channel_id'] = x['wx_originalId'] ## 公众号id
- y['from_novel'] = x['bookTitle'] ## 小说名称
- y['user_id'] = x['userId'] ## 付费用户uid
- y['stage'] = stage ## 期数
- createTime = time.strftime("%Y-%m-%d %H:%M:%S",
- time.localtime(x['createTime'] // 1000)) ## 时间戳 》struct_time 》标准时间
- y['order_time'] = createTime ## 订单生成时间
- y['amount'] = x['money'] / 100 ## 原数据单位:分
- uid_reg_time = time.strftime("%Y-%m-%d %H:%M:%S",
- time.localtime(x['userRegisterTime'] // 1000)) ## 13位时间戳 》标准时间
- y['reg_time'] = uid_reg_time ## 用户注册时间
- y['order_id'] = x['ewTradeId'] ## 订单id
- y['date'] = createTime
- y = sorted(y.items(), key=lambda item: item[0])
- y = dict(y)
- y = tuple(y.values())
- order_list = order_list + ((y),)
- if len(order_item_list) < 1000:
- break
- else:
- page += 1
- print(f"文鼎数据日期-{starttime}到{endtime}-期数-{stage}-获取数据-{len(order_list)}条")
- return order_list
- def batch_save_order_new(data):
- if data is None or len(data) == 0:
- print('数据为空,不执行数据库操作!')
- else:
- sql = 'INSERT IGNORE INTO quchen_text.ods_order(amount,channel,channel_id,date,from_novel,order_id,order_time,platform,reg_time,stage,user_id) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
- connect = MySQLConnection()
- try:
- num = connect.batch(sql, data)
- # 提交
- connect.commit()
- print('订单数据最终入库【{num}】条'.format(num=num))
- except Exception as e:
- print('订单数据入库失败:', e)
- finally:
- connect.close()
- def start_order_job_wending(st, et):
- account_list = get_wending_account_list()
- da = get_wending_order(st, et, account_list)
- batch_save_order_new(da)
- if __name__ == '__main__':
- start_order_job_wending()
|