#!/usr/bin/env python # -*- coding: utf-8 -*- """ __title__ = '每日凌晨空闲时检查本地数据库中的订单数据是否和平台昨天总订单一致' @Time : 2020/9/26 19:44 @Author : Kenny-PC @Software: PyCharm # code is far away from bugs with the god animal protecting I love animals. They taste delicious. ┏┓ ┏┓ ┏┛┻━━━┛┻┓ ┃ ☃ ┃ ┃ ┳┛ ┗┳ ┃ ┃ ┻ ┃ ┗━┓ ┏━┛ ┃ ┗━━━┓ ┃ 神兽保佑 ┣┓ ┃ 永无BUG! ┏┛ ┗┓┓┏━┳┓┏┛ ┃┫┫ ┃┫┫ ┗┻┛ ┗┻┛ """ import datetime import hashlib import math import time from concurrent.futures import ProcessPoolExecutor from urllib import parse import requests import random from apscheduler.schedulers.blocking import BlockingScheduler import account_list_zwg as al from MySQLConnection import MySQLConnection from util import date_util from util import platform_util def md5value(s): md5 = hashlib.md5() md5.update(s.encode("utf-8")) return md5.hexdigest() ##《1》阅文 def get_yuewen_order(st, et): start_exec_seconds = date_util.getCurrentSecondTime() total_order = () account_list = al.yuewen_account_list executor = ProcessPoolExecutor(max_workers=5) futures = [] for account in account_list: future = executor.submit(get_yuewen_order_task, st, et, account) futures.append(future) executor.shutdown(True) for future in futures: if len(future.result()) > 0: total_order = future.result() + total_order print('阅文订单数量:', len(total_order), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds) return total_order def get_yuewen_order_task(st, et, account): order_list = () url = 'https://open.yuewen.com/cpapi/wxRecharge/querychargelog' version = 1 start_time = st email = account[0] appsecert = account[1] for i in range(int((et - st) / 86400)): end_time = min(start_time + 86400, et) timestamp = int(time.time()) s = '' page = 1 order_status = 2 data = { 'email': email, 'version': version, 'timestamp': timestamp, 'start_time': start_time, 'end_time': end_time, 'page': page, 'order_status': order_status # 'last_min_id':last_min_id, # 'last_max_id':last_max_id, # 'total_count':total_count, # 'last_page':last_page } sorted_data = sorted(data.items()) for k, v in sorted_data: s = s + str(k) + str(v) sign = md5value(appsecert + s).upper() data1 = { 'email': email, 'version': version, 'timestamp': timestamp, 'start_time': start_time, 'end_time': end_time, 'page': page, 'order_status': order_status, 'sign': sign } list1 = requests.get(url=url, params=data1) ## 此接口有调用频率限制,相同查询条件每分钟仅能请求一次 ## exception: list1.json() {'code': 10408, 'msg': '调用频率超限'} if list1.json()['code'] != 0: print('阅文查询充值接口异常:', list1.json()) break total_count = list1.json()['data']['total_count'] last_min_id = list1.json()['data']['min_id'] last_max_id = list1.json()['data']['max_id'] last_page = list1.json()['data']['page'] if total_count > 0: for x in list1.json()['data']['list']: y = {} dtime = datetime.datetime.strptime(x['order_time'], "%Y-%m-%d %H:%M:%S") y['date'] = ((int(time.mktime(dtime.timetuple())) + 8 * 3600) // 86400) * 86400 - 8 * 3600 y['platform'] = '阅文' y['channel'] = x['app_name'] y['from_novel'] = x['book_name'] y['user_id'] = x['openid'] y['stage'] = '' y['channel_id'] = 0 y['order_time'] = x['order_time'] y['amount'] = x['amount'] y['reg_time'] = x['reg_time'] y['order_id'] = x['order_id'] y = sorted(y.items(), key=lambda item: item[0]) y = dict(y) y = tuple(y.values()) order_list = order_list + ((y),) if total_count > 100: page_while_count = math.ceil(total_count / 100) + 1 if page_while_count > 2: sleep_seconds = random.randint(60, 70) print('阅文获取订单数据线程休眠', sleep_seconds,'秒,因为该接口有一分钟的限制') time.sleep(sleep_seconds) for page in range(2, page_while_count): timestamp = int(time.time()) data = { 'email': email, 'version': version, 'timestamp': timestamp, 'start_time': start_time, 'end_time': end_time, 'page': page, 'last_min_id': last_min_id, 'last_max_id': last_max_id, 'total_count': total_count, 'last_page': last_page, 'order_status': order_status } sorted_data = sorted(data.items()) s1 = '' for k, v in sorted_data: s1 = s1 + str(k) + str(v) sign = md5value(appsecert + s1).upper() data2 = { 'email': email, 'version': version, 'timestamp': timestamp, 'start_time': start_time, 'end_time': end_time, 'page': page, 'last_min_id': last_min_id, 'last_max_id': last_max_id, 'total_count': total_count, 'last_page': last_page, 'order_status': order_status, 'sign': sign } list2 = requests.get(url=url, params=data2) if list2.json()['code'] != 0: print('阅文查询充值接口异常:', list2.json(), timestamp, int(time.time())) break for x in list2.json()['data']['list']: y = {} dtime = datetime.datetime.strptime(x['order_time'], "%Y-%m-%d %H:%M:%S") y['date'] = ((int(time.mktime(dtime.timetuple())) + 8 * 3600) // 86400) * 86400 - 8 * 3600 y['platform'] = '阅文' y['channel'] = x['app_name'] y['from_novel'] = x['book_name'] y['user_id'] = x['openid'] y['stage'] = '' y['channel_id'] = 0 y['order_time'] = x['order_time'] y['amount'] = x['amount'] y['reg_time'] = x['reg_time'] y['order_id'] = x['order_id'] y = sorted(y.items(), key=lambda item: item[0]) y = dict(y) y = tuple(y.values()) order_list = order_list + ((y),) total_count = list2.json()['data']['total_count'] last_min_id = list2.json()['data']['min_id'] last_max_id = list2.json()['data']['max_id'] last_page = list2.json()['data']['page'] start_time = start_time + 86400 return order_list ##《2》掌读 def get_zhangdu_order(st, et): start_exec_seconds = date_util.getCurrentSecondTime() total_order = () account_list = al.zhangdu_account_list executor = ProcessPoolExecutor(max_workers=5) futures = [] for account in account_list: future = executor.submit(get_zhangdu_order_task, st, et, account) futures.append(future) executor.shutdown(True) for future in futures: if len(future.result()) > 0: total_order = future.result() + total_order print('掌读订单数量:', len(total_order), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds) return total_order def get_zhangdu_order_task(st, et, account): order_list = () url = 'https://api.zhangdu520.com/channel/getorder' uid = account[0] appsecert = account[1] channel = account[2] timestamp = int(time.time()) sign = md5value(str(uid) + '&' + appsecert + '&' + str(timestamp)) starttime = st timespace = 90 * 3600 * 24 endtime = min(et, st + timespace) for x in range((et - st) // timespace + 1): # 分时段 if x > 0: print('掌读跨天数查询:', x) params = { 'uid': uid, 'timestamp': timestamp, 'sign': sign, 'starttime': starttime, 'endtime': endtime } list1 = requests.get(url=url, params=params) pageCount = list1.json()['data']['pageCount'] if pageCount == 0: continue for page in range(1, pageCount + 1): # 分页 params = { 'uid': uid, 'timestamp': timestamp, 'sign': sign, 'starttime': starttime, 'endtime': endtime, 'page': page } list2 = requests.get(url=url, params=params).json() if 'data' in list2.keys(): for b in list2['data']['list']: if b['status'] == '1': c = {} c['amount'] = b['amount'] c['channel_id'] = uid c['order_id'] = str(b['orderno']) c['order_time'] = b['ctime'] c['user_id'] = b['openid'] c['platform'] = '掌读' c['channel'] = channel c['reg_time'] = b['regtime'] c['from_novel'] = '' c['stage'] = '' c['date'] = ((int(b['ctime']) + 8 * 3600) // 86400) * 86400 - 8 * 3600 x = sorted(c.items(), key=lambda item: item[0]) x = dict(x) x = tuple(x.values()) order_list = order_list + ((x),) starttime = starttime + timespace endtime = min(et, starttime + timespace) return order_list ##《3》花生 def get_huasheng_order(st, et): start_exec_seconds = date_util.getCurrentSecondTime() total_order = () account_list = al.huasheng_account_list executor = ProcessPoolExecutor(max_workers=5) futures = [] for account in account_list: url = 'https://vip.rlcps.cn/api/getMerchants' apiKEY = account[0] apiSecurity = account[1] timestamp = str(int(time.time())) sign = md5value(apiKEY + timestamp + apiSecurity).upper() params = { 'apiKey': apiKEY, 'apiSecurity': apiSecurity, 'timestamp': timestamp, 'sign': sign } merchant_list = requests.post(url, params).json() for merchant in merchant_list['data']: future = executor.submit(get_huasheng_order_task, st, et, account, merchant) futures.append(future) executor.shutdown(True) for future in futures: if len(future.result()) > 0: total_order = future.result() + total_order print('花生订单数量:', len(total_order), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds) return total_order def get_huasheng_order_task(st, et, account, merchant): order_list = () apiKEY = account[0] apiSecurity = account[1] stage = account[2] timestamp = str(int(time.time())) merchant_id = merchant['merchant_id'] merchant_name = merchant['merchant_name'] order_url = 'https://vip.rlcps.cn/api/orderList' start_time = st for i in range((et - st) // 86400): page = 1 date = time.strftime("%Y-%m-%d", time.localtime(start_time)) sign = md5value(apiKEY + date + str(merchant_id) + timestamp + apiSecurity).upper() order_params = { 'apiKey': apiKEY, 'apiSecurity': apiSecurity, 'timestamp': timestamp, 'date': date, 'merchant_id': merchant_id, 'sign': sign, 'page': page } list1 = requests.post(order_url, order_params).json() if 'data' in list1.keys() and len(list1['data']) > 0: for i in range(int(math.ceil(list1['count'] / 500))): data2 = { 'apiKey': apiKEY, 'apiSecurity': apiSecurity, 'timestamp': timestamp, 'date': date, 'merchant_id': merchant_id, 'sign': sign, 'page': page } list2 = requests.post(order_url, data2).json() for x in list2['data']: if x['order_status'] == 1: y = {} ##dtime = datetime.datetime.strptime(x['pay_at'],"%Y-%m-%d") ##y['date']= ((int(time.mktime(dtime.timetuple()))+8*3600)//86400)*86400-8*3600 y['user_id'] = x['openid'] y['order_id'] = x['trans_id'] y['order_time'] = x['pay_at'] y['reg_time'] = x['join_at'] y['date'] = (start_time + 8 * 3600) // 86400 * 86400 - 8 * 3600 y['channel'] = merchant_name y['channel_id'] = merchant_id y['platform'] = '花生' y['stage'] = stage y['from_novel'] = x['book_name'] y['amount'] = x['amount'] y = sorted(y.items(), key=lambda item: item[0]) y = dict(y) y = tuple(y.values()) order_list = order_list + ((y),) page = page + 1 start_time = start_time + 86400 return order_list ##《4》掌中云 def get_zzy_order(st, et): start_exec_seconds = date_util.getCurrentSecondTime() total_order = () account_list = al.zzy_account_list # account_list = platform_util.get_zhangzhongyun_account_list() # account_list = [['1108701f1d6','0f9c0f8429d1a16a8a78c2306e7a4db3','清勇7月']] executor = ProcessPoolExecutor(max_workers=5) futures = [] for account in account_list: url = 'https://openapi.818tu.com/partners/channel/channels/list?' my_key = account[0] secert = account[1] my_sign = md5value(secert + 'key=' + my_key) parameter = 'key=' + my_key + '&sign=' + my_sign channel_list = requests.get(url + parameter) # 获取子渠道列表 if 'data' in channel_list.json().keys(): items = channel_list.json()['data']['items'] else: print('掌中云本次请求数据为空', account) items = [] continue for item in items: # 获取channel_id 后逐个拉取历史orders future = executor.submit(get_zzy_order_task, st, et, account, item) futures.append(future) executor.shutdown(True) for future in futures: if len(future.result()) > 0: total_order = future.result() + total_order print('掌中云订单数量:', len(total_order), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds) return total_order def get_zzy_order_task(st, et, account, item): # 掌中云的时间格式比较特殊,转换下 st = platform_util.get_zhangzhongyun_format_time(st) et = platform_util.get_zhangzhongyun_format_time(et) order_list = () my_key = account[0] secert = account[1] stage = account[2] channel_id = item['id'] channel = item['nickname'] status = str(1) per_page = str(1000) limit_time = et get_time = st lt = parse.urlencode({'created_at[lt]': limit_time}) gt = parse.urlencode({'created_at[gt]': get_time}) url_1 = 'https://openapi.818tu.com/partners/channel/orders/list?' my_sign_1 = md5value(secert + 'channel_id=' + str( channel_id) + '&created_at[gt]=' + get_time + '&created_at[lt]=' + limit_time + '&key=' + my_key + '&per_page=' + per_page + '&status=' + status) parameter_1 = 'channel_id=' + str( channel_id) + '&' + gt + '&' + lt + '&per_page=' + per_page + '&status=' + status + '&key=' + my_key + '&sign=' + my_sign_1 orders = requests.get(url_1 + parameter_1) t = orders.json()['data']['count'] // int(per_page) + 1 for page in range(1, t + 1): my_sign_2 = md5value(secert + 'channel_id=' + str( channel_id) + '&created_at[gt]=' + get_time + '&created_at[lt]=' + limit_time + '&key=' + my_key + '&page=' + str( page) + '&per_page=' + per_page + '&status=' + status) parameter_2 = 'channel_id=' + str(channel_id) + '&' + gt + '&' + lt + '&page=' + str( page) + '&per_page=' + per_page + '&status=' + status + '&key=' + my_key + '&sign=' + my_sign_2 orders_1 = requests.get(url_1 + parameter_2) # print(orders_1.json()) b = orders_1.json()['data']['items'] for a in b: c = {} c['user_id'] = str(a['member']['openid']) c['channel'] = channel c['reg_time'] = a['member']['created_at'] c['channel_id'] = channel_id c['amount'] = round(a['price'] / 100, 2) c['order_id'] = str(a['id']) c['order_time'] = a['created_at'] c['platform'] = '掌中云' c['stage'] = stage dtime = datetime.datetime.strptime(a['created_at'][0:10], "%Y-%m-%d") c['date'] = ((int(time.mktime(dtime.timetuple())) + 8 * 3600) // 86400) * 86400 - 8 * 3600 if str(a['from_novel_id']) != 'None': c['from_novel'] = a['from_novel']['title'] else: c['from_novel'] = 'None' x = sorted(c.items(), key=lambda item: item[0]) x = dict(x) x = tuple(x.values()) order_list = order_list + ((x),) return order_list ##《5》 悠书阁 def get_ysg_order(st, et): start_exec_seconds = date_util.getCurrentSecondTime() total_order = () account_list = al.ysg_account_list executor = ProcessPoolExecutor(max_workers=5) futures = [] for account in account_list: future = executor.submit(get_ysg_order_task, st, et, account) futures.append(future) executor.shutdown(True) for future in futures: if len(future.result()) > 0: total_order = future.result() + total_order print('悠书阁订单数量:', len(total_order), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds) return total_order def get_ysg_order_task(st, et, account): order_list = () url = 'https://novel.youshuge.com/v2/open/orders' # 超过100条就需要分页,别问我为什么知道,看代码看出来的 max_page_size = 100 host_name = account[0] channel_id = account[1] secert_key = account[2] channel = account[3] stage = account[4] timestamp = int(time.time()) start_date = time.strftime("%Y-%m-%d", time.localtime(st)) end_date = time.strftime("%Y-%m-%d", time.localtime(et)) page = 1 str1 = 'channel_id=' + str(channel_id) + '&end_date=' + end_date + '&host_name=' + host_name + '&page=' + str( page) + '&pay_status=1' + '&start_date=' + start_date + '&time=' + str(timestamp) + '&key=' + secert_key sign = md5value(str1).upper() data = { 'sign': sign, 'host_name': host_name, 'time': timestamp, 'channel_id': channel_id, 'page': page, 'pay_status': 1, 'start_date': start_date, 'end_date': end_date } respone = requests.post(url, data) if respone.status_code == 400: print('respone', respone) result_json = respone.json() first_page_order = build_ysg_order_data(channel, channel_id, result_json, stage) order_list = order_list + first_page_order if len(first_page_order) == 0: return order_list total_count = result_json['data'][0]['count'] if total_count > max_page_size: for i in range((total_count - 1) // max_page_size + 1): timestamp = int(time.time()) str1 = 'channel_id=' + str( channel_id) + '&end_date=' + end_date + '&host_name=' + host_name + '&page=' + str( page) + '&pay_status=1' + '&start_date=' + start_date + '&time=' + str(timestamp) + '&key=' + secert_key sign = md5value(str1).upper() data2 = { 'sign': sign, 'host_name': host_name, 'time': timestamp, 'channel_id': channel_id, 'page': page, 'pay_status': 1, 'start_date': start_date, 'end_date': end_date } r2 = requests.post(url, data2).json() order_list = order_list + build_ysg_order_data(channel, channel_id, r2, stage) page = page + 1 return order_list def build_ysg_order_data(channel, channel_id, result_json, stage): order_list = () if 'data' in result_json.keys(): data = result_json['data'] if len(data) > 0: for x in data: y = {} dtime = datetime.datetime.strptime(x['create_time'][0:10], "%Y-%m-%d") y['date'] = ((int( time.mktime(dtime.timetuple())) + 8 * 3600) // 86400) * 86400 - 8 * 3600 y['order_id'] = x['order_num'] y['amount'] = round(int(x['price']) / 100, 2) y['order_time'] = x['create_time'] y['channel'] = channel y['from_novel'] = x['book_name'] y['stage'] = stage y['user_id'] = x['openid'] y['channel_id'] = channel_id y['platform'] = '悠书阁' y['reg_time'] = x['reg_time'] y = sorted(y.items(), key=lambda item: item[0]) y = dict(y) y = tuple(y.values()) order_list = order_list + ((y),) return order_list # 数据导入表采用replace替换主键orderid的方法 def mysql_insert_order(data): if data is None or len(data) == 0: print('数据为空,不执行数据库操作!') else: sql = 'replace into quchen_text.`order_zwg` (amount,channel,channel_id,date,from_novel,order_id,order_time,platform,reg_time,stage,user_id) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);' connect = MySQLConnection('test') try: num = connect.batch(sql, data) # 提交 connect.commit() print(num, '条订单数据入库成功') except Exception as e: print('订单数据入库失败:', e) finally: connect.close() # 获取各平台的订单数量 def mysql_select_platform_order_count(date): sql = 'SELECT platform, COUNT(1) AS num FROM quchen_text.`order_zwg` WHERE date = %s GROUP BY platform' connect = MySQLConnection('test') platform_order_count = [] try: platform_order_count = connect.query(sql, date) return platform_order_count except Exception as e: print('各平台的订单数据查询失败:', e) finally: connect.close() return platform_order_count def start_all_job(): start_exec_seconds = date_util.getCurrentSecondTime() platform_order_num_list = mysql_select_platform_order_count(date_util.getYesterdayStartTime()) st_unix = date_util.getYesterdayStartTime() et_unix = date_util.getTodayStartTime() print('查询开始时间:', st_unix, date_util.getSecondsToDatetime(st_unix)) print('查询结束时间:', et_unix, date_util.getSecondsToDatetime(et_unix)) # order_list = get_yuewen_order(st_unix, et_unix) # mysql_insert_order(order_list) if len(platform_order_num_list) == 0: print('本地库中没有任何数据,现在全平台补全') mysql_insert_order(get_zzy_order(st_unix, et_unix)) mysql_insert_order(get_yuewen_order(st_unix, et_unix)) mysql_insert_order(get_huasheng_order(st_unix, et_unix)) mysql_insert_order(get_ysg_order(st_unix, et_unix)) mysql_insert_order(get_zhangdu_order(st_unix, et_unix)) else: platform_list = ['阅文','悠书阁','掌读','掌中云','花生'] for platform_order_num in platform_order_num_list: platform = str(platform_order_num['platform']) num = int(platform_order_num['num']) platform_list.remove(platform) if platform == '阅文': order_list = get_yuewen_order(st_unix, et_unix) if len(order_list) != num: print('阅文数据实际订单和已经入库数据差异:', len(order_list) - num) mysql_insert_order(order_list) elif platform == '悠书阁': order_list = get_ysg_order(st_unix, et_unix) if len(order_list) != num: print('悠书阁数据实际订单和已经入库数据差异:', len(order_list) - num) mysql_insert_order(order_list) elif platform == '掌读': order_list = get_zhangdu_order(st_unix, et_unix) if len(order_list) != num: print('掌读数据实际订单和已经入库数据差异:', len(order_list) - num) mysql_insert_order(order_list) elif platform == '掌中云': order_list = get_zzy_order(st_unix, et_unix) if len(order_list) != num: print('掌中云数据实际订单和已经入库数据差异:', len(order_list) - num) mysql_insert_order(order_list) elif platform == '花生': order_list = get_huasheng_order(st_unix, et_unix) if len(order_list) != num: print('花生数据实际订单和已经入库数据差异:', len(order_list) - num) mysql_insert_order(order_list) else: print('发现未知平台数据!', platform_order_num) for platform in platform_list: if platform == '阅文': print('阅文没有数据') mysql_insert_order(get_yuewen_order(st_unix, et_unix)) elif platform == '悠书阁': print('悠书阁没有数据') mysql_insert_order(get_ysg_order(st_unix, et_unix)) elif platform == '掌读': print('掌读没有数据') mysql_insert_order(get_zhangdu_order(st_unix, et_unix)) elif platform == '掌中云': print('掌中云没有数据') mysql_insert_order(get_zzy_order(st_unix, et_unix)) elif platform == '花生': print('花生没有数据') mysql_insert_order(get_huasheng_order(st_unix, et_unix)) else: print('什么鬼平台:', platform) print('订单检查执行时间(秒):', date_util.getCurrentSecondTime() - start_exec_seconds) if __name__ == '__main__': # start_all_job() scheduler = BlockingScheduler() #每天凌晨3点到4点的30分钟都执行一次 scheduler.add_job(start_all_job, 'cron', hour='3-4', minute='35') scheduler.start()