|
@@ -0,0 +1,232 @@
|
|
|
+import hashlib
|
|
|
+import time
|
|
|
+import datetime
|
|
|
+from concurrent.futures import ProcessPoolExecutor
|
|
|
+import requests
|
|
|
+from util import date_util
|
|
|
+from util import platform_config_util
|
|
|
+from util import robust_util
|
|
|
+from util.MySQLConnection import MySQLConnection
|
|
|
+
|
|
|
+
|
|
|
+def md5(s):
|
|
|
+ md5 = hashlib.md5()
|
|
|
+ md5.update(s.encode("utf-8"))
|
|
|
+ return md5.hexdigest()
|
|
|
+
|
|
|
+
|
|
|
+def getSelfDateStr(times=time.time(), date_format='%Y%m%d'):
|
|
|
+ """
|
|
|
+ ## 20201028添加,阳光接口,文鼎接口,日期参数请求格式20201028,一日一拉api数据
|
|
|
+ description: 获取指定时间戳
|
|
|
+ time: 秒 默认当前时间
|
|
|
+ return: 返回指定时间戳的前一日日期 。 比如 :接收20190512号的时间戳,返回 20190513 -> str
|
|
|
+ tips: 一天86400秒
|
|
|
+ """
|
|
|
+ timestamps = str(time.strftime(date_format, time.localtime(times)))
|
|
|
+ return timestamps
|
|
|
+
|
|
|
+
|
|
|
+def get_wending_account_list():
|
|
|
+ sql = "select text from order_account_text where platform='文鼎'"
|
|
|
+ con = MySQLConnection()
|
|
|
+ data = con.query(sql)
|
|
|
+ li = []
|
|
|
+ for i in data:
|
|
|
+ a = i['text'].replace('\n', '').split(',')
|
|
|
+ li.append(a)
|
|
|
+ return li
|
|
|
+
|
|
|
+
|
|
|
+def get_wending_order(st, et, account_list):
|
|
|
+ total_order_list = ()
|
|
|
+ start_exec_seconds = date_util.getCurrentSecondTime()
|
|
|
+
|
|
|
+ futures = []
|
|
|
+ for account in account_list:
|
|
|
+ futures.append(get_wending_order_task(st, et, account))
|
|
|
+
|
|
|
+ for future in futures:
|
|
|
+
|
|
|
+ if len(future) > 0:
|
|
|
+ total_order_list = future + total_order_list
|
|
|
+
|
|
|
+ print('文鼎订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
|
|
|
+ return total_order_list
|
|
|
+
|
|
|
+
|
|
|
+def get_wd_account_siteid_list(account):
|
|
|
+ url = 'https://bi.reading.163.com/dist-api/siteList'
|
|
|
+
|
|
|
+ consumerkey = account[0]
|
|
|
+ secretkey = account[1]
|
|
|
+ stage = account[3]
|
|
|
+ timestamp = int(time.time() * 1000)
|
|
|
+
|
|
|
+ siteid_params = {
|
|
|
+ "consumerkey": consumerkey,
|
|
|
+ 'secretkey': secretkey,
|
|
|
+ 'timestamp': timestamp,
|
|
|
+ }
|
|
|
+ sorted_data = sorted(siteid_params.items(), reverse=False)
|
|
|
+ s = ""
|
|
|
+ for k, v in sorted_data:
|
|
|
+ s = s + str(k) + "=" + str(v)
|
|
|
+ sign = md5(s).lower()
|
|
|
+ siteid_params['sign'] = sign
|
|
|
+
|
|
|
+ consumerkey = siteid_params['consumerkey']
|
|
|
+ timestamp = siteid_params['timestamp']
|
|
|
+ parameter = 'consumerkey=' + str(consumerkey) + '×tamp=' + str(timestamp) + '&sign=' + str(sign)
|
|
|
+ get_url = url + "?" + parameter
|
|
|
+
|
|
|
+ while True:
|
|
|
+ r = requests.get(url=get_url)
|
|
|
+ if r.status_code == 200:
|
|
|
+ break
|
|
|
+
|
|
|
+ try:
|
|
|
+ id_key_list = r.json()['data']
|
|
|
+ except:
|
|
|
+ return []
|
|
|
+ mpid_list = []
|
|
|
+ try:
|
|
|
+ for id_key_val in id_key_list:
|
|
|
+ mpid = dict(id_key_val)["mpId"]
|
|
|
+ mpid_list.append(mpid)
|
|
|
+ except Exception as e:
|
|
|
+ print(stage, '站点查询返回结果:', r.json())
|
|
|
+
|
|
|
+ return mpid_list
|
|
|
+
|
|
|
+
|
|
|
+def get_wending_json_object(url, params):
|
|
|
+ params['timestamp'] = int(time.time() * 1000)
|
|
|
+ sorted_data = sorted(params.items(), reverse=False)
|
|
|
+ s = ""
|
|
|
+ for k, v in sorted_data:
|
|
|
+ s = s + str(k) + "=" + str(v)
|
|
|
+ sign = md5(s).lower()
|
|
|
+ params['sign'] = sign
|
|
|
+
|
|
|
+ consumerkey = params['consumerkey']
|
|
|
+ secretkey = params['secretkey']
|
|
|
+ timestamp = params['timestamp']
|
|
|
+ siteid = params['siteid']
|
|
|
+ pageSize = params['pageSize']
|
|
|
+ starttime = params['starttime']
|
|
|
+ endtime = params['endtime']
|
|
|
+ page = params['page']
|
|
|
+ paystatus = params['paystatus']
|
|
|
+ ## +'&secretkey='+str(secretkey)
|
|
|
+ parameter = 'consumerkey=' + str(consumerkey) + '×tamp=' + str(timestamp) + '&siteid=' + str(
|
|
|
+ siteid) + '&pageSize=' + str(pageSize) \
|
|
|
+ + '&starttime=' + str(starttime) + '&endtime=' + str(endtime) + '&page=' + str(
|
|
|
+ page) + '&paystatus=' + str(paystatus) + '&sign=' + str(sign)
|
|
|
+ global get_url
|
|
|
+ get_url = url + "?" + parameter
|
|
|
+
|
|
|
+ while True:
|
|
|
+ r = requests.get(url=get_url)
|
|
|
+ if r.status_code == 200:
|
|
|
+ break
|
|
|
+ else:
|
|
|
+ time.sleep(1)
|
|
|
+ print("请求连接出错,等待1s...")
|
|
|
+
|
|
|
+ response_result_json = r.json()
|
|
|
+ del params['sign']
|
|
|
+ return response_result_json
|
|
|
+
|
|
|
+
|
|
|
+def get_wending_order_task(st, et, account):
|
|
|
+ order_list = ()
|
|
|
+ url = 'https://bi.reading.163.com/dist-api/rechargeList'
|
|
|
+ consumerkey = account[0]
|
|
|
+ secretkey = account[1]
|
|
|
+ siteid = account[2]
|
|
|
+ stage = account[3]
|
|
|
+ siteid_list = get_wd_account_siteid_list(account)
|
|
|
+ print(siteid_list)
|
|
|
+ if len(siteid_list) == 0:
|
|
|
+ siteid_list.append(siteid)
|
|
|
+
|
|
|
+ starttime = getSelfDateStr(st, '%Y%m%d%H%M')
|
|
|
+ endtime = getSelfDateStr(et, '%Y%m%d%H%M')
|
|
|
+
|
|
|
+ for siteid in siteid_list:
|
|
|
+
|
|
|
+ page = 1
|
|
|
+ while True:
|
|
|
+ params = {
|
|
|
+ 'consumerkey': consumerkey,
|
|
|
+ 'secretkey': secretkey,
|
|
|
+ 'timestamp': int(1601481600),
|
|
|
+ 'siteid': siteid,
|
|
|
+ 'pageSize': 1000,
|
|
|
+ 'starttime': starttime,
|
|
|
+ 'endtime': endtime,
|
|
|
+ 'page': page,
|
|
|
+ 'paystatus': 1}
|
|
|
+
|
|
|
+ response_result_json = get_wending_json_object(url, params)
|
|
|
+
|
|
|
+ order_item_list = response_result_json['data']['rechargeList']
|
|
|
+
|
|
|
+ for x in order_item_list:
|
|
|
+ y = {}
|
|
|
+
|
|
|
+ y['platform'] = '文鼎'
|
|
|
+ y['channel'] = x['wx_mpName'] ## 公众号名称
|
|
|
+ y['channel_id'] = x['wx_originalId'] ## 公众号id
|
|
|
+ y['from_novel'] = x['bookTitle'] ## 小说名称
|
|
|
+ y['user_id'] = x['userId'] ## 付费用户uid
|
|
|
+ y['stage'] = stage ## 期数
|
|
|
+ createTime = time.strftime("%Y-%m-%d %H:%M:%S",
|
|
|
+ time.localtime(x['createTime'] // 1000)) ## 时间戳 》struct_time 》标准时间
|
|
|
+ y['order_time'] = createTime ## 订单生成时间
|
|
|
+ y['amount'] = x['money'] / 100 ## 原数据单位:分
|
|
|
+ uid_reg_time = time.strftime("%Y-%m-%d %H:%M:%S",
|
|
|
+ time.localtime(x['userRegisterTime'] // 1000)) ## 13位时间戳 》标准时间
|
|
|
+ y['reg_time'] = uid_reg_time ## 用户注册时间
|
|
|
+ y['order_id'] = x['ewTradeId'] ## 订单id
|
|
|
+ y['date'] = createTime
|
|
|
+ y = sorted(y.items(), key=lambda item: item[0])
|
|
|
+ y = dict(y)
|
|
|
+ y = tuple(y.values())
|
|
|
+ order_list = order_list + ((y),)
|
|
|
+
|
|
|
+ if len(order_item_list) < 1000:
|
|
|
+ break
|
|
|
+ else:
|
|
|
+ page += 1
|
|
|
+
|
|
|
+ print(f"文鼎数据日期-{starttime}到{endtime}-期数-{stage}-获取数据-{len(order_list)}条")
|
|
|
+ return order_list
|
|
|
+
|
|
|
+def batch_save_order_new(data):
|
|
|
+ if data is None or len(data) == 0:
|
|
|
+ print('数据为空,不执行数据库操作!')
|
|
|
+ else:
|
|
|
+ sql = 'INSERT IGNORE INTO quchen_text.ods_order(amount,channel,channel_id,date,from_novel,order_id,order_time,platform,reg_time,stage,user_id) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
|
|
|
+ connect = MySQLConnection()
|
|
|
+ try:
|
|
|
+ num = connect.batch(sql, data)
|
|
|
+ # 提交
|
|
|
+ connect.commit()
|
|
|
+ print('订单数据最终入库【{num}】条'.format(num=num))
|
|
|
+ except Exception as e:
|
|
|
+ print('订单数据入库失败:', e)
|
|
|
+ finally:
|
|
|
+ connect.close()
|
|
|
+
|
|
|
+
|
|
|
+def start_order_job_wending(st, et):
|
|
|
+ account_list = get_wending_account_list()
|
|
|
+ da = get_wending_order(st, et, account_list)
|
|
|
+ batch_save_order_new(da)
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ start_order_job_wending()
|
|
|
+
|