#!/usr/bin/env python # -*- coding: utf-8 -*- """ __title__ = '每日凌晨空闲时检查本地数据库中的订单数据是否和平台昨天总订单一致' @Time : 2020/9/26 19:44 @Author : Kenny-PC @Software: PyCharm # code is far away from bugs with the god animal protecting I love animals. They taste delicious. ┏┓ ┏┓ ┏┛┻━━━┛┻┓ ┃ ☃ ┃ ┃ ┳┛ ┗┳ ┃ ┃ ┻ ┃ ┗━┓ ┏━┛ ┃ ┗━━━┓ ┃ 神兽保佑 ┣┓ ┃ 永无BUG! ┏┛ ┗┓┓┏━┳┓┏┛ ┃┫┫ ┃┫┫ ┗┻┛ ┗┻┛ """ import datetime import hashlib import math import random import time import traceback from concurrent.futures import ProcessPoolExecutor from urllib import parse import requests import account_list_zwg as al from MySQLConnection import MySQLConnection from util import date_util from util import platform_util def md5value(s): md5 = hashlib.md5() md5.update(s.encode("utf-8")) return md5.hexdigest() ##《1》阅文 def get_yuewen_order(st, et): start_exec_seconds = date_util.getCurrentSecondTime() total_order_list = () account_list = platform_util.get_yuewen_account_list() executor = ProcessPoolExecutor(max_workers=5) futures = [] for account in account_list: future = executor.submit(get_yuewen_order_task, st, et, account) futures.append(future) executor.shutdown(True) for future in futures: order_list = future.result() if len(order_list) > 0: total_order_list = order_list + total_order_list print('阅文订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds) return total_order_list def get_yuewen_order_task(st, et, account): order_list = () email = account[0] appsecert = account[1] url = 'https://open.yuewen.com/cpapi/wxRecharge/querychargelog' version = 1 order_status = 2 #已支付 page_count = 100 #每页100条数据 start_time = st for i in range((et - st) // 86400 + 1): page = 1 last_min_id = '' last_max_id = '' total_count = '' last_page = '' while True: if start_time == et: break end_time = min(start_time + 86400, et) timestamp = int(time.time()) params = { 'email': email, 'version': version, 'timestamp': timestamp, 'start_time': start_time, 'end_time': end_time, 'page': page, 'order_status': order_status } if page > 1: params['last_min_id'] = last_min_id params['last_max_id'] = last_max_id params['total_count'] = total_count params['last_page'] = last_page sorted_data = sorted(params.items()) str_params = '' for k, v in sorted_data: str_params = str_params + str(k) + str(v) sign = md5value(appsecert + str_params).upper() #放入签名 params['sign'] = sign response_result_json = requests.get(url=url, params=params).json() code = response_result_json['code'] ## 此接口有调用频率限制,相同查询条件每分钟仅能请求一次 if code != 0: print('阅文查询充值接口异常:', response_result_json, '参数', params) break # if code == 10408: # if fail_count > 0: # break # # sleep_seconds = random.randint(60, 70) # print('阅文获取订单数据线程休眠【{sleep_seconds}】秒,因为该接口有一分钟的限制'.format(sleep_seconds=sleep_seconds)) # time.sleep(sleep_seconds) # # print('重试一次') # fail_count = fail_count + 1 # get_yuewen_order_task(st, et, account, fail_count) response_data = response_result_json['data'] total_count = response_data['total_count'] if total_count == 0: continue last_min_id = response_data['min_id'] last_max_id = response_data['max_id'] last_page = response_data['page'] order_item_list = response_data['list'] for order_item in order_item_list: order_time = order_item['order_time'] dtime = datetime.datetime.strptime(order_time, "%Y-%m-%d %H:%M:%S") order_time_unix = int(time.mktime(dtime.timetuple())) order_id = order_item['order_id'] if date_util.checkInterval(start_time, end_time, order_time_unix) == False: print('阅文账号【{key}】, 查询时间【{start_time} - {end_time}】,有不符合该时间范围的订单,订单Id【{order_id}】的时间为【{order_time}】' .format(key=email, start_time=date_util.getSecondsToDatetime(start_time), end_time=date_util.getSecondsToDatetime(end_time),order_id=order_id, order_time=order_time)) continue order = {} order['date'] = ((order_time_unix + 8 * 3600) // 86400) * 86400 - 8 * 3600 order['platform'] = '阅文' order['channel'] = order_item['app_name'] order['from_novel'] = order_item['book_name'] order['user_id'] = order_item['openid'] order['stage'] = '' order['channel_id'] = 0 order['order_time'] = order_time order['amount'] = order_item['amount'] order['reg_time'] = order_item['reg_time'] order['order_id'] = order_id order = sorted(order.items(), key=lambda item: item[0]) order = dict(order) order = tuple(order.values()) order_list = order_list + ((order),) # print('阅文账号【{key}】, 查询时间【{start_time} - {end_time}】,当前页【{page}】,本次查询订单数量【{total_count}】' # .format(key=email, start_time=date_util.getSecondsToDatetime(start_time), # end_time=date_util.getSecondsToDatetime(end_time),page=page, total_count=total_count)) if int(page) >= math.ceil(total_count / int(page_count)): break page = page + 1 start_time = start_time + 86400 #天数加1 # sleep_seconds = random.randint(60, 70) # print('阅文获取订单数据线程休眠【{sleep_seconds}】秒,因为该接口有一分钟的限制'.format(sleep_seconds=sleep_seconds)) # time.sleep(sleep_seconds) return order_list ##《2》掌读 def get_zhangdu_order(st, et): start_exec_seconds = date_util.getCurrentSecondTime() total_order_list = () account_list = platform_util.get_zhangdu_account_list() executor = ProcessPoolExecutor(max_workers=5) futures = [] for account in account_list: future = executor.submit(get_zhangdu_order_task, st, et, account) futures.append(future) executor.shutdown(True) for future in futures: order_list = future.result() if len(order_list) > 0: total_order_list = order_list + total_order_list print('掌读订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds) return total_order_list def get_zhangdu_order_task(st, et, account): order_list = () url = 'https://api.zhangdu520.com/channel/getorder' uid = account[0] appsecert = account[1] channel = account[2] timestamp = int(time.time()) sign = md5value(str(uid) + '&' + appsecert + '&' + str(timestamp)) starttime = st timespace = 90 * 3600 * 24 endtime = min(et, st + timespace) for x in range((et - st) // timespace + 1): # 分时段 if x > 0: print('掌读跨天数查询:', x) params = { 'uid': uid, 'timestamp': timestamp, 'sign': sign, 'starttime': starttime, 'endtime': endtime } response_result_json = requests.get(url=url, params=params).json() pageCount = response_result_json['data']['pageCount'] if pageCount == 0: continue for page in range(1, pageCount + 1): # 分页 params = { 'uid': uid, 'timestamp': timestamp, 'sign': sign, 'starttime': starttime, 'endtime': endtime, 'page': page } list2 = requests.get(url=url, params=params).json() if 'data' in list2.keys(): for b in list2['data']['list']: if b['status'] == '1': c = {} c['amount'] = b['amount'] c['channel_id'] = uid c['order_id'] = str(b['orderno']) c['order_time'] = b['ctime'] c['user_id'] = b['openid'] c['platform'] = '掌读' c['channel'] = channel c['reg_time'] = b['regtime'] c['from_novel'] = '' c['stage'] = '' c['date'] = ((int(b['ctime']) + 8 * 3600) // 86400) * 86400 - 8 * 3600 x = sorted(c.items(), key=lambda item: item[0]) x = dict(x) x = tuple(x.values()) order_list = order_list + ((x),) starttime = starttime + timespace endtime = min(et, starttime + timespace) return order_list ##《3》花生 def get_huasheng_order(st, et): start_exec_seconds = date_util.getCurrentSecondTime() total_order_list = () account_list = platform_util.get_huasheng_account_list() executor = ProcessPoolExecutor(max_workers=5) futures = [] for account in account_list: url = 'https://vip.rlcps.cn/api/getMerchants' apiKey = str(account[0]) apiSecurity = account[1] timestamp = str(int(time.time())) sign = md5value(apiKey + timestamp + apiSecurity).upper() params = { 'apiKey': apiKey, 'apiSecurity': apiSecurity, 'timestamp': timestamp, 'sign': sign } response_result_json = requests.post(url, params).json() if 'data' not in response_result_json.keys(): # print('花生账号【{apiKey}】本次请求数据为空,响应报文【{result}】'.format(apiKey=apiKey, result=response_result_json)) continue for merchant in response_result_json['data']: future = executor.submit(get_huasheng_order_task, st, et, account, merchant) futures.append(future) executor.shutdown(True) for future in futures: order_list = future.result() if len(order_list) > 0: total_order_list = order_list + total_order_list print('花生订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds) return total_order_list def get_huasheng_order_task(st, et, account, merchant): order_list = () apiKey = str(account[0]) apiSecurity = account[1] stage = account[2] timestamp = str(int(time.time())) order_url = 'https://vip.rlcps.cn/api/orderList' merchant_id = merchant['merchant_id'] merchant_name = merchant['merchant_name'] start_time = st limit = 500 for i in range((et - st) // 86400 + 1): page = 1 while True: date = time.strftime("%Y-%m-%d", time.localtime(start_time)) sign = md5value(apiKey + date + str(merchant_id) + timestamp + apiSecurity).upper() order_params = { 'apiKey': apiKey, 'apiSecurity': apiSecurity, 'timestamp': timestamp, 'date': date, 'merchant_id': merchant_id, 'sign': sign, 'page': page, 'limit': limit } response_result_json = requests.post(order_url, order_params).json() if 'data' not in response_result_json.keys() or len(response_result_json['data']) == 0: # print('花生账号【{key}】, 渠道【{merchant_id}:{merchant_name}】本次请求数据为空,响应报文【{result}】' # .format(key=apiKey, merchant_id=merchant_id, merchant_name=merchant_name, # result=response_result_json)) break total_count = response_result_json['count'] order_item_list = response_result_json['data'] for order_item in order_item_list: if order_item['order_status'] == 1: # 1为已支付 order = {} ##dtime = datetime.datetime.strptime(order_item['pay_at'],"%Y-%m-%d") ##order['date']= ((int(time.mktime(dtime.timetuple()))+8*3600)//86400)*86400-8*3600 order['user_id'] = order_item['openid'] order['order_id'] = order_item['trans_id'] order['order_time'] = order_item['pay_at'] order['reg_time'] = order_item['join_at'] # TODO 花生的时间需要统一 order['date'] = (start_time + 8 * 3600) // 86400 * 86400 - 8 * 3600 order['channel'] = merchant_name order['channel_id'] = merchant_id order['platform'] = '花生' order['stage'] = stage order['from_novel'] = order_item['book_name'] order['amount'] = order_item['amount'] order = sorted(order.items(), key=lambda item: item[0]) order = dict(order) order = tuple(order.values()) order_list = order_list + ((order),) if int(page) >= math.ceil(total_count / int(limit)): break # print('花生账号【{key}】, 渠道【{merchant_id}:{merchant_name}】当前页【{page}】,本次查询订单数【{total_count}】,即将查询下一页' # .format(key=apiKey, merchant_id=merchant_id, merchant_name=merchant_name, page=page, total_count=total_count)) page = page + 1 start_time = start_time + 86400 # 天数加1 return order_list ##《4》掌中云 def get_zzy_order(st, et): start_exec_seconds = date_util.getCurrentSecondTime() total_order_list = () account_list = al.zzy_account_list # account_list = platform_util.get_zhangzhongyun_account_list() # account_list = [['1108701f1d6','0f9c0f8429d1a16a8a78c2306e7a4db3','清勇7月']] # account_list = [['1109295d56c','9bb955186597882ac473e86ba4576158','趣程20期']] executor = ProcessPoolExecutor(max_workers=5) futures = [] for account in account_list: url = 'https://openapi.818tu.com/partners/channel/channels/list?' key = account[0] secert = account[1] sign = md5value(secert + 'key=' + key) params = 'key=' + key + '&sign=' + sign response_result_json = requests.get(url + params).json() # 获取子渠道列表 if 'data' not in response_result_json.keys(): # print('掌中云账号【{key}】本次请求数据为空,响应报文【{result}】'.format(key=key, result=response_result_json)) continue items = response_result_json['data']['items'] for channel in items: # 获取channel_id 后逐个拉取历史orders future = executor.submit(get_zzy_order_task, st, et, account, channel) futures.append(future) executor.shutdown(True) for future in futures: order_list = future.result() if len(order_list) > 0: total_order_list = order_list + total_order_list print('掌中云订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds) return total_order_list def get_zzy_order_task(st, et, account, channel): # 掌中云的时间格式比较特殊,转换下 st = platform_util.get_zhangzhongyun_format_time(st) et = platform_util.get_zhangzhongyun_format_time(et) order_list = () key = account[0] secert = account[1] stage = account[2] order_url = 'https://openapi.818tu.com/partners/channel/orders/list?' channel_id = channel['id'] channel_name = channel['nickname'] status = str(1) page = str(1) per_page = str(1000) get_time = st limit_time = et gte = parse.urlencode({'created_at[gte]': get_time}) # gte就是ge 大于等于开始时间 lt = parse.urlencode({'created_at[lt]': limit_time}) # 小于 结束时间 while True: sign = md5value(secert + 'channel_id=' + str( channel_id) + '&created_at[gte]=' + get_time + '&created_at[lt]=' + limit_time + '&key=' + key + '&page=' + str( page) + '&per_page=' + per_page + '&status=' + status) params = 'channel_id=' + str(channel_id) + '&' + gte + '&' + lt + '&page=' + str( page) + '&per_page=' + per_page + '&status=' + status + '&key=' + key + '&sign=' + sign response_result_json = requests.get(order_url + params).json() if 'data' not in response_result_json.keys(): # print('掌中云账号【{key}】, 渠道【{channel_id}:{channel_name}】本次请求数据为空,响应报文【{result}】' # .format(key=key, channel_id=channel_id, channel_name=channel_name, result=response_result_json)) break total_count = response_result_json['data']['count'] # 总数量 order_item_list = response_result_json['data']['items'] # 订单列表 for order_item in order_item_list: order = {} order['user_id'] = str(order_item['member']['openid']) order['channel'] = channel_name order['reg_time'] = order_item['member']['created_at'] order['channel_id'] = channel_id order['amount'] = round(order_item['price'] / 100, 2) order['order_id'] = str(order_item['id']) order['order_time'] = order_item['created_at'] order['platform'] = '掌中云' order['stage'] = stage dtime = datetime.datetime.strptime(order_item['created_at'][0:10], "%Y-%m-%d") order['date'] = ((int(time.mktime(dtime.timetuple())) + 8 * 3600) // 86400) * 86400 - 8 * 3600 if str(order_item['from_novel_id']) != 'None': order['from_novel'] = order_item['from_novel']['title'] else: order['from_novel'] = 'None' x = sorted(order.items(), key=lambda item: item[0]) x = dict(x) x = tuple(x.values()) order_list = order_list + ((x),) if int(page) >= math.ceil(total_count / int(per_page)): break # print('掌中云账号【{key}】, 渠道【{channel_id}:{channel_name}】当前页【{page}】,本次查询订单数【{total_count}】,即将查询下一页' # .format(key=key, channel_id=channel_id, channel_name=channel_name, page=page, total_count=total_count)) page = int(page) + 1 return order_list ##《5》 悠书阁 def get_ysg_order(st, et): start_exec_seconds = date_util.getCurrentSecondTime() total_order_list = () account_list = platform_util.get_youshuge_account_list() executor = ProcessPoolExecutor(max_workers=5) futures = [] for account in account_list: future = executor.submit(get_ysg_order_task, st, et, account) futures.append(future) executor.shutdown(True) for future in futures: order_list = future.result() if len(order_list) > 0: total_order_list = order_list + total_order_list print('悠书阁订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds) return total_order_list def get_ysg_order_task(st, et, account): order_list = () url = 'https://novel.youshuge.com/v2/open/orders' # 超过100条就需要分页,别问我为什么知道,看代码看出来的 max_page_size = 100 host_name = account[0] channel_id = int(account[1]) secert_key = account[2] channel = account[3] stage = account[4] timestamp = int(time.time()) start_date = time.strftime("%Y-%m-%d", time.localtime(st)) end_date = time.strftime("%Y-%m-%d", time.localtime(et)) page = 1 str1 = 'channel_id=' + str(channel_id) + '&end_date=' + end_date + '&host_name=' + host_name + '&page=' + str( page) + '&pay_status=1' + '&start_date=' + start_date + '&time=' + str(timestamp) + '&key=' + secert_key sign = md5value(str1).upper() data = { 'sign': sign, 'host_name': host_name, 'time': timestamp, 'channel_id': channel_id, 'page': page, 'pay_status': 1, 'start_date': start_date, 'end_date': end_date } respone = requests.post(url, data) if respone.status_code == 400: print('respone', respone) result_json = respone.json() first_page_order = build_ysg_order_data(channel, channel_id, result_json, stage) order_list = order_list + first_page_order if len(first_page_order) == 0: return order_list total_count = result_json['data'][0]['count'] if total_count > max_page_size: for i in range((total_count - 1) // max_page_size + 1): timestamp = int(time.time()) str1 = 'channel_id=' + str( channel_id) + '&end_date=' + end_date + '&host_name=' + host_name + '&page=' + str( page) + '&pay_status=1' + '&start_date=' + start_date + '&time=' + str(timestamp) + '&key=' + secert_key sign = md5value(str1).upper() data2 = { 'sign': sign, 'host_name': host_name, 'time': timestamp, 'channel_id': channel_id, 'page': page, 'pay_status': 1, 'start_date': start_date, 'end_date': end_date } r2 = requests.post(url, data2).json() order_list = order_list + build_ysg_order_data(channel, channel_id, r2, stage) page = page + 1 return order_list def build_ysg_order_data(channel, channel_id, result_json, stage): order_list = () if 'data' in result_json.keys(): data = result_json['data'] if len(data) > 0: for x in data: y = {} dtime = datetime.datetime.strptime(x['create_time'][0:10], "%Y-%m-%d") y['date'] = ((int( time.mktime(dtime.timetuple())) + 8 * 3600) // 86400) * 86400 - 8 * 3600 y['order_id'] = x['order_num'] y['amount'] = round(int(x['price']) / 100, 2) y['order_time'] = x['create_time'] y['channel'] = channel y['from_novel'] = x['book_name'] y['stage'] = stage y['user_id'] = x['openid'] y['channel_id'] = channel_id y['platform'] = '悠书阁' y['reg_time'] = x['reg_time'] y = sorted(y.items(), key=lambda item: item[0]) y = dict(y) y = tuple(y.values()) order_list = order_list + ((y),) return order_list # 数据导入表采用replace替换主键orderid的方法 def mysql_insert_order(data): if data is None or len(data) == 0: print('数据为空,不执行数据库操作!') else: sql = 'replace into quchen_text.`order_zwg` (amount,channel,channel_id,date,from_novel,order_id,order_time,platform,reg_time,stage,user_id) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);' connect = MySQLConnection() try: num = connect.batch(sql, data) # 提交 connect.commit() print(num, '条订单数据入库成功') except Exception as e: traceback.print_exc() print('订单数据入库失败:', e) finally: connect.close() # 获取各平台的订单数量 def mysql_select_platform_order_count(date): sql = 'SELECT platform, COUNT(1) AS num FROM quchen_text.`order_zwg` WHERE date = %s GROUP BY platform' connect = MySQLConnection() platform_order_count = [] try: platform_order_count = connect.query(sql, date) return platform_order_count except Exception as e: traceback.print_exc() print('各平台的订单数据查询失败:', e) finally: connect.close() return platform_order_count def start_all_job(): start_exec_seconds = date_util.getCurrentSecondTime() st_unix = date_util.getYesterdayStartTime() et_unix = date_util.getTodayStartTime() # st_unix = 1601136000 # 2020/9/27 0:0:0 # et_unix = 1601308800 # 2020/9/29 0:0:0 # et_unix = st_unix + 10 # 2020/9/29 0:0:0 print('查询开始时间:', st_unix, date_util.getSecondsToDatetime(st_unix)) print('查询结束时间:', et_unix, date_util.getSecondsToDatetime(et_unix)) order_list = get_yuewen_order(st_unix, et_unix) mysql_insert_order(order_list) order_id_list = [] for order in order_list: if order[5] == '4200000691202009304870914729': print('存在', order[5]) order_id_list.append(order[5]) print(len(order_id_list)) # b = set(order_id_list) # for each_b in b: # count = 0 # for each_a in order_id_list: # if each_b == each_a: # count += 1 # print(each_b, ": ", count) exit_flag = True if exit_flag: exit() platform_order_num_list = mysql_select_platform_order_count(date_util.getYesterdayStartTime()) if len(platform_order_num_list) == 0: print('本地库中没有任何数据,现在全平台补全') mysql_insert_order(get_zzy_order(st_unix, et_unix)) mysql_insert_order(get_yuewen_order(st_unix, et_unix)) mysql_insert_order(get_huasheng_order(st_unix, et_unix)) mysql_insert_order(get_ysg_order(st_unix, et_unix)) mysql_insert_order(get_zhangdu_order(st_unix, et_unix)) else: platform_list = ['阅文','悠书阁','掌读','掌中云','花生'] for platform_order_num in platform_order_num_list: platform = str(platform_order_num['platform']) num = int(platform_order_num['num']) platform_list.remove(platform) if platform == '阅文': order_list = get_yuewen_order(st_unix, et_unix) if len(order_list) != num: print('阅文数据实际订单和已经入库数据差异:', len(order_list) - num) mysql_insert_order(order_list) elif platform == '悠书阁': order_list = get_ysg_order(st_unix, et_unix) if len(order_list) != num: print('悠书阁数据实际订单和已经入库数据差异:', len(order_list) - num) mysql_insert_order(order_list) elif platform == '掌读': order_list = get_zhangdu_order(st_unix, et_unix) if len(order_list) != num: print('掌读数据实际订单和已经入库数据差异:', len(order_list) - num) mysql_insert_order(order_list) elif platform == '掌中云': order_list = get_zzy_order(st_unix, et_unix) if len(order_list) != num: print('掌中云数据实际订单和已经入库数据差异:', len(order_list) - num) mysql_insert_order(order_list) elif platform == '花生': order_list = get_huasheng_order(st_unix, et_unix) if len(order_list) != num: print('花生数据实际订单和已经入库数据差异:', len(order_list) - num) mysql_insert_order(order_list) else: print('发现未知平台数据!', platform_order_num) for platform in platform_list: if platform == '阅文': print('阅文没有数据') mysql_insert_order(get_yuewen_order(st_unix, et_unix)) elif platform == '悠书阁': print('悠书阁没有数据') mysql_insert_order(get_ysg_order(st_unix, et_unix)) elif platform == '掌读': print('掌读没有数据') mysql_insert_order(get_zhangdu_order(st_unix, et_unix)) elif platform == '掌中云': print('掌中云没有数据') mysql_insert_order(get_zzy_order(st_unix, et_unix)) elif platform == '花生': print('花生没有数据') mysql_insert_order(get_huasheng_order(st_unix, et_unix)) else: print('什么鬼平台:', platform) print('订单检查执行时间(秒):', date_util.getCurrentSecondTime() - start_exec_seconds) if __name__ == '__main__': start_all_job() # scheduler = BlockingScheduler() # #每天凌晨3点到4点的30分钟都执行一次 # scheduler.add_job(start_all_job, 'cron', hour='3-4', minute='35') # scheduler.start()