| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258 | 
							- import hashlib
 
- import time  
 
- import datetime
 
- from concurrent.futures import ProcessPoolExecutor
 
- import requests
 
- from util import date_util
 
- from util import platform_config_util
 
- from util import robust_util
 
- from util.MySQLConnection import MySQLConnection
 
- def md5(s):
 
-     md5 = hashlib.md5()
 
-     md5.update(s.encode("utf-8"))
 
-     return md5.hexdigest()
 
- def getSelfDateStr(times=time.time(),date_format='%Y%m%d'):
 
-     """
 
-     ## 20201028添加,阳光接口,文鼎接口,日期参数请求格式20201028,一日一拉api数据
 
-     description:  获取指定时间戳
 
-     time:         秒 默认当前时间
 
-     return:       返回指定时间戳的前一日日期 。 比如 :接收20190512号的时间戳,返回 20190513 -> str
 
-     tips:         一天86400秒
 
-     """
 
-     timestamps = str(time.strftime(date_format,time.localtime(times)))
 
-     return timestamps
 
- def get_wending_account_list():
 
-     """
 
-     des cription:  文鼎账号列表
 
-     return:       [['consumerkey', 'secretkey', 'siteid', 'stage', 'account']] ->list
 
-     """
 
-     return platform_config_util.get_account_list('文鼎', 'wending_account_config.csv')
 
- def get_wending_order(st,et,account_list):
 
-     total_order_list = ()
 
-     start_exec_seconds = date_util.getCurrentSecondTime()
 
-     futures = []
 
-     for account in account_list:
 
-         futures.append(get_wending_order_task(st, et, account))
 
-     for future in futures:
 
-         if len(future)>0:
 
-             total_order_list = future + total_order_list
 
-     print('文鼎订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
 
-     print(total_order_list)
 
-     return total_order_list
 
- def get_wd_account_siteid_list(account):
 
-     url = 'https://bi.reading.163.com/dist-api/siteList'
 
-     consumerkey = account[0]
 
-     secretkey = account[1]
 
-     stage = account[3]
 
-     timestamp = int(time.time()*1000)
 
-   
 
-     siteid_params = {
 
-         "consumerkey":consumerkey,
 
-         'secretkey':secretkey,
 
-         'timestamp':timestamp,
 
-     }
 
-     sorted_data = sorted(siteid_params.items(),reverse = False)
 
-     s=""
 
-     for k,v in sorted_data:
 
-         s = s+str(k)+"="+str(v)
 
-     sign = md5(s).lower()
 
-     siteid_params['sign'] = sign
 
-     consumerkey = siteid_params['consumerkey']
 
-     timestamp = siteid_params['timestamp']
 
-     parameter = 'consumerkey='+str(consumerkey)+'×tamp='+str(timestamp)+'&sign='+str(sign)
 
-     get_url = url+"?"+parameter
 
-     while True:
 
-         r = requests.get(url=get_url)
 
-         if r.status_code==200:
 
-             break
 
-     try:
 
-         id_key_list = r.json()['data']
 
-     except:
 
-         return []
 
-     mpid_list = []
 
-     try:
 
-       for id_key_val in id_key_list:
 
-           mpid = dict(id_key_val)["mpId"]
 
-           mpid_list.append(mpid)
 
-     except Exception as e:
 
-         print(stage,'站点查询返回结果:',r.json())
 
-     return mpid_list
 
- def get_wending_json_object(url,params):
 
-     params['timestamp'] = int(time.time()*1000)
 
-     sorted_data = sorted(params.items(),reverse = False)
 
-     s=""
 
-     for k,v in sorted_data:
 
-       s = s+str(k)+"="+str(v)
 
-     sign = md5(s).lower()
 
-     params['sign'] = sign
 
-     consumerkey = params['consumerkey']
 
-     secretkey = params['secretkey']
 
-     timestamp = params['timestamp']
 
-     siteid = params['siteid']
 
-     pageSize = params['pageSize']
 
-     starttime = params['starttime']
 
-     endtime = params['endtime']
 
-     page = params['page']
 
-     paystatus = params['paystatus']
 
-     ## +'&secretkey='+str(secretkey)
 
-     parameter = 'consumerkey='+str(consumerkey)+'×tamp='+str(timestamp)+'&siteid='+str(siteid)+'&pageSize='+str(pageSize)\
 
-               +'&starttime='+str(starttime)+'&endtime='+str(endtime)+'&page='+str(page)+'&paystatus='+str(paystatus)+'&sign='+str(sign)
 
-     global get_url
 
-     get_url = url+"?"+parameter
 
-     while True:
 
-         r= requests.get(url=get_url)
 
-         if r.status_code==200:
 
-             break
 
-         else:
 
-             time.sleep(1)
 
-             print("请求连接出错,等待1s...")
 
-     response_result_json=r.json()
 
-     del params['sign']
 
-     return response_result_json
 
- def get_wending_order_task(st,et,account):
 
-     order_list = ()
 
-     url = 'https://bi.reading.163.com/dist-api/rechargeList'
 
-     consumerkey = account[0][1:]
 
-     print(consumerkey)
 
-     secretkey = account[1]
 
-     siteid=account[2]
 
-     stage = account[3]
 
-     siteid_list = get_wd_account_siteid_list(account)
 
-     print(siteid_list)
 
-     if len(siteid_list) == 0:
 
-         siteid_list.append(siteid)
 
-     starttime = getSelfDateStr(st,'%Y%m%d%H%M')
 
-     endtime = getSelfDateStr(et,'%Y%m%d%H%M')
 
-     for siteid in siteid_list:
 
-         page = 1
 
-         while True:
 
-             params = {
 
-             'consumerkey': consumerkey,
 
-             'secretkey':secretkey,
 
-             'timestamp':int(1601481600),
 
-             'siteid':siteid,
 
-             'pageSize':1000,
 
-             'starttime':starttime,
 
-             'endtime':endtime,
 
-             'page':page,
 
-             'paystatus':1}
 
-             response_result_json = get_wending_json_object(url,params)
 
-             order_item_list = response_result_json['data']['rechargeList']
 
-             for x in order_item_list:
 
-                 y={}
 
-                 y['date'] = (int(x['payTime']//1000)+ 8 * 3600) // 86400 * 86400 - 8 * 3600    ## 网易的是13位时间戳
 
-                 y['platform'] = '文鼎'
 
-                 y['channel'] = x['wx_mpName']            ## 公众号名称
 
-                 y['channel_id'] = x['wx_originalId']     ## 公众号id
 
-                 y['from_novel'] = x['bookTitle']         ## 小说名称
 
-                 y['user_id'] = x['userId']               ## 付费用户uid
 
-                 y['stage'] = stage                       ## 期数
 
-                 createTime = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(x['createTime']//1000))  ## 时间戳 》struct_time 》标准时间
 
-                 y['order_time']= createTime              ## 订单生成时间
 
-                 y['amount']=x['money']/100               ## 原数据单位:分
 
-                 uid_reg_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(x['userRegisterTime']//1000))     ## 13位时间戳 》标准时间
 
-                 y['reg_time']= uid_reg_time              ## 用户注册时间
 
-                 y['order_id']= x['ewTradeId']            ## 订单id
 
-                 y = sorted(y.items(), key=lambda item:item[0])
 
-                 y = dict(y)
 
-                 y = tuple(y.values())
 
-                 order_list = order_list+((y),)
 
-             if len(order_item_list)<1000:
 
-                 break
 
-             else:
 
-                 page+=1
 
-     print(f"文鼎数据日期-{starttime}到{endtime}-期数-{stage}-获取数据-{len(order_list)}条")
 
-     return order_list
 
- def batch_save_order(data):
 
-     if data is None or len(data) == 0:
 
-         print('数据为空,不执行数据库操作!')
 
-     else:
 
-         sql = 'INSERT IGNORE INTO quchen_text.order(amount,channel,channel_id,date,from_novel,order_id,order_time,platform,reg_time,stage,user_id) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
 
-         connect = MySQLConnection()
 
-         try:
 
-             num = connect.batch(sql, data)
 
-             # 提交
 
-             connect.commit()
 
-             print('订单数据最终入库【{num}】条'.format(num=num))
 
-         except Exception as e:
 
-             print('订单数据入库失败:', e)
 
-         finally:
 
-             connect.close()
 
- def batch_save_order_new(data):
 
-     if data is None or len(data) == 0:
 
-         print('数据为空,不执行数据库操作!')
 
-     else:
 
-         sql = 'INSERT IGNORE INTO quchen_text.ods_order(amount,channel,channel_id,date,from_novel,order_id,order_time,platform,reg_time,stage,user_id) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
 
-         connect = MySQLConnection()
 
-         try:
 
-             num = connect.batch(sql, data)
 
-             # 提交
 
-             connect.commit()
 
-             print('订单数据最终入库【{num}】条'.format(num=num))
 
-         except Exception as e:
 
-             print('订单数据入库失败:', e)
 
-         finally:
 
-             connect.close()
 
- def start_order_job():
 
-     st_unix = date_util.get_n_day(n=-1, is_timestamp=1)
 
-     et_unix = date_util.get_n_day(is_timestamp=1)
 
-     account_list = get_wending_account_list()
 
-     print(account_list)
 
-     batch_save_order(get_wending_order(st_unix, et_unix,account_list))
 
-     batch_save_order_new(get_wending_order(st_unix, et_unix,account_list))
 
- if __name__ == '__main__':
 
-     start_order_job()
 
 
  |