Prechádzať zdrojové kódy

测试代码修改:
增加异常装饰器,再也不用担心一个平台出现异常导致后续平台也爬不到数据啦

zhengwangeng 4 rokov pred
rodič
commit
9c64e9688a

+ 9 - 1
README.md

@@ -17,9 +17,17 @@ Data grabbing platform(DGP)数据采集平台
 
 * 数据库使用`dbutils`连接池,并独立配置mysql的配置文件`db_config.ini`
 * 平台订单数据爬取采用线程池子线程爬取,大大提升爬取速度
-* 平台账号配置使用csv配置文件存储,方便修改同步,并支持直接修改后,下次获取即可生效,无需重启脚本
+* 平台账号配置(`conf/account`)使用csv配置文件存储,方便修改同步,并支持直接修改后,下次获取即可生效,无需重启脚本
 
+#### 后续优化点
 
+- [x] 数据库连接需要单独配置
+- [x] 账号,token配置也需要独立成配置文件,方便实时获取最新的数据,而不需要重启脚本
+- [x] 多线程,异步获取订单数据,目前获取数据都是单线程,太慢了,掌中云2020-09-25的订单数据共12106条,执行时间为529秒,需要特别优化。
+- [ ] 业务报警,钉钉提醒,邮件?
+- [ ] 异常处理
+- [ ] 日志输出
+- [ ] 全局常量配置
 
 #### 使用说明
 

+ 4 - 4
dgp/tests/check_order.py

@@ -35,9 +35,9 @@ from urllib import parse
 import requests
 
 import account_list_zwg as al
-from MySQLConnection import MySQLConnection
+from util.MySQLConnection import MySQLConnection
 from util import date_util
-from util import platform_util
+from util import platform_config_util
 
 
 def md5value(s):
@@ -458,8 +458,8 @@ def get_zzy_order(st, et):
 
 def get_zzy_order_task(st, et, account, item):
     # 掌中云的时间格式比较特殊,转换下
-    st = platform_util.get_zhangzhongyun_format_time(st)
-    et = platform_util.get_zhangzhongyun_format_time(et)
+    st = platform_config_util.get_zhangzhongyun_format_time(st)
+    et = platform_config_util.get_zhangzhongyun_format_time(et)
 
     order_list = ()
 

+ 33 - 690
dgp/tests/check_order_new.py

@@ -24,653 +24,9 @@ __title__ = '每日凌晨空闲时检查本地数据库中的订单数据是否
                   ┗┻┛  ┗┻┛
 """
 
-import datetime
-import hashlib
-import math
-import random
-import time
-import traceback
-from concurrent.futures import ProcessPoolExecutor
-from urllib import parse
-
-import requests
-
-import account_list_zwg as al
-from MySQLConnection import MySQLConnection
 from util import date_util
-from util import platform_util
-
-
-def md5value(s):
-    md5 = hashlib.md5()
-    md5.update(s.encode("utf-8"))
-    return md5.hexdigest()
-
-
-##《1》阅文
-def get_yuewen_order(st, et):
-    start_exec_seconds = date_util.getCurrentSecondTime()
-    total_order_list = ()
-    account_list = platform_util.get_yuewen_account_list()
-
-    executor = ProcessPoolExecutor(max_workers=5)
-
-    futures = []
-    for account in account_list:
-        future = executor.submit(get_yuewen_order_task, st, et, account)
-        futures.append(future)
-
-    executor.shutdown(True)
-
-    for future in futures:
-        order_list = future.result()
-        if len(order_list) > 0:
-            total_order_list = order_list + total_order_list
-
-    print('阅文订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
-    return total_order_list
-
-
-def get_yuewen_order_task(st, et, account):
-    order_list = ()
-
-    email = account[0]
-    appsecert = account[1]
-
-    url = 'https://open.yuewen.com/cpapi/wxRecharge/querychargelog'
-    version = 1
-    order_status = 2 #已支付
-    page_count = 100 #每页100条数据
-    start_time = st
-
-    for i in range((et - st) // 86400 + 1):
-        page = 1
-        last_min_id = ''
-        last_max_id = ''
-        total_count = ''
-        last_page = ''
-
-        while True:
-            if start_time == et:
-                break
-
-            end_time = min(start_time + 86400, et)
-            timestamp = int(time.time())
-
-            params = {
-                'email': email,
-                'version': version,
-                'timestamp': timestamp,
-                'start_time': start_time,
-                'end_time': end_time,
-                'page': page,
-                'order_status': order_status
-            }
-
-            if page > 1:
-                params['last_min_id'] = last_min_id
-                params['last_max_id'] = last_max_id
-                params['total_count'] = total_count
-                params['last_page'] = last_page
-
-            sorted_data = sorted(params.items())
-            str_params = ''
-            for k, v in sorted_data:
-                str_params = str_params + str(k) + str(v)
-
-            sign = md5value(appsecert + str_params).upper()
-
-            #放入签名
-            params['sign'] = sign
-            response_result_json = requests.get(url=url, params=params).json()
-
-            code = response_result_json['code']
-            ## 此接口有调用频率限制,相同查询条件每分钟仅能请求一次
-            if code != 0:
-                print('阅文查询充值接口异常:', response_result_json, '参数', params)
-                break
-                # if code == 10408:
-                #     if fail_count > 0:
-                #         break
-                #
-                #     sleep_seconds = random.randint(60, 70)
-                #     print('阅文获取订单数据线程休眠【{sleep_seconds}】秒,因为该接口有一分钟的限制'.format(sleep_seconds=sleep_seconds))
-                #     time.sleep(sleep_seconds)
-                #
-                #     print('重试一次')
-                #     fail_count = fail_count + 1
-                #     get_yuewen_order_task(st, et, account, fail_count)
-
-            response_data = response_result_json['data']
-            total_count = response_data['total_count']
-
-            if total_count == 0:
-                continue
-
-            last_min_id = response_data['min_id']
-            last_max_id = response_data['max_id']
-            last_page = response_data['page']
-            order_item_list = response_data['list']
-
-            for order_item in order_item_list:
-                order_time = order_item['order_time']
-                dtime = datetime.datetime.strptime(order_time, "%Y-%m-%d %H:%M:%S")
-                order_time_unix = int(time.mktime(dtime.timetuple()))
-                order_id = order_item['order_id']
-                if date_util.checkInterval(start_time, end_time, order_time_unix) == False:
-                    print('阅文账号【{key}】, 查询时间【{start_time} - {end_time}】,有不符合该时间范围的订单,订单Id【{order_id}】的时间为【{order_time}】'
-                          .format(key=email, start_time=date_util.getSecondsToDatetime(start_time),
-                                  end_time=date_util.getSecondsToDatetime(end_time),order_id=order_id, order_time=order_time))
-                    continue
-
-                order = {}
-                order['date'] = ((order_time_unix + 8 * 3600) // 86400) * 86400 - 8 * 3600
-                order['platform'] = '阅文'
-                order['channel'] = order_item['app_name']
-                order['from_novel'] = order_item['book_name']
-                order['user_id'] = order_item['openid']
-                order['stage'] = ''
-                order['channel_id'] = 0
-                order['order_time'] = order_time
-                order['amount'] = order_item['amount']
-                order['reg_time'] = order_item['reg_time']
-                order['order_id'] = order_id
-
-                order = sorted(order.items(), key=lambda item: item[0])
-                order = dict(order)
-                order = tuple(order.values())
-                order_list = order_list + ((order),)
-
-            # print('阅文账号【{key}】, 查询时间【{start_time} - {end_time}】,当前页【{page}】,本次查询订单数量【{total_count}】'
-            #       .format(key=email, start_time=date_util.getSecondsToDatetime(start_time),
-            #               end_time=date_util.getSecondsToDatetime(end_time),page=page, total_count=total_count))
-
-            if int(page) >= math.ceil(total_count / int(page_count)):
-                break
-
-            page = page + 1
-
-        start_time = start_time + 86400  #天数加1
-
-        # sleep_seconds = random.randint(60, 70)
-        # print('阅文获取订单数据线程休眠【{sleep_seconds}】秒,因为该接口有一分钟的限制'.format(sleep_seconds=sleep_seconds))
-        # time.sleep(sleep_seconds)
-
-    return order_list
-
-
-##《2》掌读
-def get_zhangdu_order(st, et):
-    start_exec_seconds = date_util.getCurrentSecondTime()
-    total_order_list = ()
-    account_list = platform_util.get_zhangdu_account_list()
-
-    executor = ProcessPoolExecutor(max_workers=5)
-
-    futures = []
-    for account in account_list:
-        future = executor.submit(get_zhangdu_order_task, st, et, account)
-        futures.append(future)
-    executor.shutdown(True)
-
-    for future in futures:
-        order_list = future.result()
-        if len(order_list) > 0:
-            total_order_list = order_list + total_order_list
-
-    print('掌读订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
-    return total_order_list
-
-
-def get_zhangdu_order_task(st, et, account):
-    order_list = ()
-    url = 'https://api.zhangdu520.com/channel/getorder'
-
-    uid = account[0]
-    appsecert = account[1]
-    channel = account[2]
-    timestamp = int(time.time())
-    sign = md5value(str(uid) + '&' + appsecert + '&' + str(timestamp))
-    starttime = st
-    timespace = 90 * 3600 * 24
-    endtime = min(et, st + timespace)
-
-    for x in range((et - st) // timespace + 1):  # 分时段
-        if x > 0:
-            print('掌读跨天数查询:', x)
-
-        params = {
-            'uid': uid,
-            'timestamp': timestamp,
-            'sign': sign,
-            'starttime': starttime,
-            'endtime': endtime
-        }
-        response_result_json = requests.get(url=url, params=params).json()
-        pageCount = response_result_json['data']['pageCount']
-        if pageCount == 0:
-            continue
-
-        for page in range(1, pageCount + 1):  # 分页
-            params = {
-                'uid': uid,
-                'timestamp': timestamp,
-                'sign': sign,
-                'starttime': starttime,
-                'endtime': endtime,
-                'page': page
-            }
-            list2 = requests.get(url=url, params=params).json()
-            if 'data' in list2.keys():
-                for b in list2['data']['list']:
-                    if b['status'] == '1':
-                        c = {}
-                        c['amount'] = b['amount']
-                        c['channel_id'] = uid
-                        c['order_id'] = str(b['orderno'])
-                        c['order_time'] = b['ctime']
-                        c['user_id'] = b['openid']
-                        c['platform'] = '掌读'
-                        c['channel'] = channel
-                        c['reg_time'] = b['regtime']
-                        c['from_novel'] = ''
-                        c['stage'] = ''
-                        c['date'] = ((int(b['ctime']) + 8 * 3600) // 86400) * 86400 - 8 * 3600
-
-                        x = sorted(c.items(), key=lambda item: item[0])
-                        x = dict(x)
-                        x = tuple(x.values())
-                        order_list = order_list + ((x),)
-
-        starttime = starttime + timespace
-        endtime = min(et, starttime + timespace)
-
-    return order_list
-
-
-##《3》花生
-def get_huasheng_order(st, et):
-    start_exec_seconds = date_util.getCurrentSecondTime()
-    total_order_list = ()
-    account_list = platform_util.get_huasheng_account_list()
-
-    executor = ProcessPoolExecutor(max_workers=5)
-
-    futures = []
-    for account in account_list:
-        url = 'https://vip.rlcps.cn/api/getMerchants'
-        apiKey = str(account[0])
-        apiSecurity = account[1]
-        timestamp = str(int(time.time()))
-        sign = md5value(apiKey + timestamp + apiSecurity).upper()
-        params = {
-            'apiKey': apiKey,
-            'apiSecurity': apiSecurity,
-            'timestamp': timestamp,
-            'sign': sign
-        }
-        response_result_json = requests.post(url, params).json()
-
-        if 'data' not in response_result_json.keys():
-            # print('花生账号【{apiKey}】本次请求数据为空,响应报文【{result}】'.format(apiKey=apiKey, result=response_result_json))
-            continue
-
-        for merchant in response_result_json['data']:
-            future = executor.submit(get_huasheng_order_task, st, et, account, merchant)
-            futures.append(future)
-
-    executor.shutdown(True)
-
-    for future in futures:
-        order_list = future.result()
-        if len(order_list) > 0:
-            total_order_list = order_list + total_order_list
-
-    print('花生订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
-    return total_order_list
-
-
-def get_huasheng_order_task(st, et, account, merchant):
-    order_list = ()
-
-    apiKey = str(account[0])
-    apiSecurity = account[1]
-    stage = account[2]
-    timestamp = str(int(time.time()))
-
-    order_url = 'https://vip.rlcps.cn/api/orderList'
-    merchant_id = merchant['merchant_id']
-    merchant_name = merchant['merchant_name']
-    start_time = st
-    limit = 500
-
-    for i in range((et - st) // 86400 + 1):
-        page = 1
-
-        while True:
-            date = time.strftime("%Y-%m-%d", time.localtime(start_time))
-            sign = md5value(apiKey + date + str(merchant_id) + timestamp + apiSecurity).upper()
-            order_params = {
-                'apiKey': apiKey,
-                'apiSecurity': apiSecurity,
-                'timestamp': timestamp,
-                'date': date,
-                'merchant_id': merchant_id,
-                'sign': sign,
-                'page': page,
-                'limit': limit
-            }
-            response_result_json = requests.post(order_url, order_params).json()
-
-            if 'data' not in response_result_json.keys() or len(response_result_json['data']) == 0:
-                # print('花生账号【{key}】, 渠道【{merchant_id}:{merchant_name}】本次请求数据为空,响应报文【{result}】'
-                #       .format(key=apiKey, merchant_id=merchant_id, merchant_name=merchant_name,
-                #               result=response_result_json))
-                break
-
-            total_count = response_result_json['count']
-            order_item_list = response_result_json['data']
-
-            for order_item in order_item_list:
-                if order_item['order_status'] == 1:  # 1为已支付
-                    order = {}
-                    ##dtime = datetime.datetime.strptime(order_item['pay_at'],"%Y-%m-%d")
-                    ##order['date']= ((int(time.mktime(dtime.timetuple()))+8*3600)//86400)*86400-8*3600
-                    order['user_id'] = order_item['openid']
-                    order['order_id'] = order_item['trans_id']
-                    order['order_time'] = order_item['pay_at']
-                    order['reg_time'] = order_item['join_at']
-                    # TODO 花生的时间需要统一
-                    order['date'] = (start_time + 8 * 3600) // 86400 * 86400 - 8 * 3600
-                    order['channel'] = merchant_name
-                    order['channel_id'] = merchant_id
-                    order['platform'] = '花生'
-                    order['stage'] = stage
-                    order['from_novel'] = order_item['book_name']
-                    order['amount'] = order_item['amount']
-
-                    order = sorted(order.items(), key=lambda item: item[0])
-                    order = dict(order)
-                    order = tuple(order.values())
-                    order_list = order_list + ((order),)
-
-            if int(page) >= math.ceil(total_count / int(limit)):
-                break
-
-            # print('花生账号【{key}】, 渠道【{merchant_id}:{merchant_name}】当前页【{page}】,本次查询订单数【{total_count}】,即将查询下一页'
-            #       .format(key=apiKey, merchant_id=merchant_id, merchant_name=merchant_name, page=page, total_count=total_count))
-
-            page = page + 1
-
-        start_time = start_time + 86400  # 天数加1
-
-    return order_list
-
-
-##《4》掌中云
-def get_zzy_order(st, et):
-    start_exec_seconds = date_util.getCurrentSecondTime()
-    total_order_list = ()
-    account_list = al.zzy_account_list
-    # account_list = platform_util.get_zhangzhongyun_account_list()
-    # account_list = [['1108701f1d6','0f9c0f8429d1a16a8a78c2306e7a4db3','清勇7月']]
-    # account_list = [['1109295d56c','9bb955186597882ac473e86ba4576158','趣程20期']]
-
-    executor = ProcessPoolExecutor(max_workers=5)
-
-    futures = []
-    for account in account_list:
-        url = 'https://openapi.818tu.com/partners/channel/channels/list?'
-        key = account[0]
-        secert = account[1]
-        sign = md5value(secert + 'key=' + key)
-        params = 'key=' + key + '&sign=' + sign
-        response_result_json = requests.get(url + params).json()  # 获取子渠道列表
-
-        if 'data' not in response_result_json.keys():
-            # print('掌中云账号【{key}】本次请求数据为空,响应报文【{result}】'.format(key=key, result=response_result_json))
-            continue
-
-        items = response_result_json['data']['items']
-        for channel in items:
-            # 获取channel_id 后逐个拉取历史orders
-            future = executor.submit(get_zzy_order_task, st, et, account, channel)
-            futures.append(future)
-
-    executor.shutdown(True)
-
-    for future in futures:
-        order_list = future.result()
-        if len(order_list) > 0:
-            total_order_list = order_list + total_order_list
-
-    print('掌中云订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
-    return total_order_list
-
-
-def get_zzy_order_task(st, et, account, channel):
-    # 掌中云的时间格式比较特殊,转换下
-    st = platform_util.get_zhangzhongyun_format_time(st)
-    et = platform_util.get_zhangzhongyun_format_time(et)
-
-    order_list = ()
-
-    key = account[0]
-    secert = account[1]
-    stage = account[2]
-
-    order_url = 'https://openapi.818tu.com/partners/channel/orders/list?'
-    channel_id = channel['id']
-    channel_name = channel['nickname']
-    status = str(1)
-    page = str(1)
-    per_page = str(1000)
-    get_time = st
-    limit_time = et
-    gte = parse.urlencode({'created_at[gte]': get_time})  # gte就是ge 大于等于开始时间
-    lt = parse.urlencode({'created_at[lt]': limit_time})  # 小于 结束时间
-
-    while True:
-        sign = md5value(secert + 'channel_id=' + str(
-            channel_id) + '&created_at[gte]=' + get_time + '&created_at[lt]=' + limit_time + '&key=' + key + '&page=' + str(
-            page) + '&per_page=' + per_page + '&status=' + status)
-        params = 'channel_id=' + str(channel_id) + '&' + gte + '&' + lt + '&page=' + str(
-            page) + '&per_page=' + per_page + '&status=' + status + '&key=' + key + '&sign=' + sign
-
-        response_result_json = requests.get(order_url + params).json()
-
-        if 'data' not in response_result_json.keys():
-            # print('掌中云账号【{key}】, 渠道【{channel_id}:{channel_name}】本次请求数据为空,响应报文【{result}】'
-            #       .format(key=key, channel_id=channel_id, channel_name=channel_name, result=response_result_json))
-            break
-
-        total_count = response_result_json['data']['count']  # 总数量
-        order_item_list = response_result_json['data']['items']  # 订单列表
-
-        for order_item in order_item_list:
-            order = {}
-            order['user_id'] = str(order_item['member']['openid'])
-            order['channel'] = channel_name
-            order['reg_time'] = order_item['member']['created_at']
-            order['channel_id'] = channel_id
-            order['amount'] = round(order_item['price'] / 100, 2)
-            order['order_id'] = str(order_item['id'])
-            order['order_time'] = order_item['created_at']
-            order['platform'] = '掌中云'
-            order['stage'] = stage
-            dtime = datetime.datetime.strptime(order_item['created_at'][0:10], "%Y-%m-%d")
-            order['date'] = ((int(time.mktime(dtime.timetuple())) + 8 * 3600) // 86400) * 86400 - 8 * 3600
-
-            if str(order_item['from_novel_id']) != 'None':
-                order['from_novel'] = order_item['from_novel']['title']
-            else:
-                order['from_novel'] = 'None'
-
-            x = sorted(order.items(), key=lambda item: item[0])
-            x = dict(x)
-            x = tuple(x.values())
-            order_list = order_list + ((x),)
-
-        if int(page) >= math.ceil(total_count / int(per_page)):
-            break
-
-        # print('掌中云账号【{key}】, 渠道【{channel_id}:{channel_name}】当前页【{page}】,本次查询订单数【{total_count}】,即将查询下一页'
-        #       .format(key=key, channel_id=channel_id, channel_name=channel_name, page=page, total_count=total_count))
-
-        page = int(page) + 1
-
-    return order_list
-
-
-##《5》 悠书阁
-def get_ysg_order(st, et):
-    start_exec_seconds = date_util.getCurrentSecondTime()
-    total_order_list = ()
-    account_list = platform_util.get_youshuge_account_list()
-
-    executor = ProcessPoolExecutor(max_workers=5)
-
-    futures = []
-    for account in account_list:
-        future = executor.submit(get_ysg_order_task, st, et, account)
-        futures.append(future)
-    executor.shutdown(True)
-
-    for future in futures:
-        order_list = future.result()
-        if len(order_list) > 0:
-            total_order_list = order_list + total_order_list
-
-    print('悠书阁订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
-    return total_order_list
-
-
-def get_ysg_order_task(st, et, account):
-    order_list = ()
-    url = 'https://novel.youshuge.com/v2/open/orders'
-    # 超过100条就需要分页,别问我为什么知道,看代码看出来的
-    max_page_size = 100
-
-    host_name = account[0]
-    channel_id = int(account[1])
-    secert_key = account[2]
-    channel = account[3]
-    stage = account[4]
-
-    timestamp = int(time.time())
-    start_date = time.strftime("%Y-%m-%d", time.localtime(st))
-    end_date = time.strftime("%Y-%m-%d", time.localtime(et))
-    page = 1
-    str1 = 'channel_id=' + str(channel_id) + '&end_date=' + end_date + '&host_name=' + host_name + '&page=' + str(
-        page) + '&pay_status=1' + '&start_date=' + start_date + '&time=' + str(timestamp) + '&key=' + secert_key
-    sign = md5value(str1).upper()
-    data = {
-        'sign': sign,
-        'host_name': host_name,
-        'time': timestamp,
-        'channel_id': channel_id,
-        'page': page,
-        'pay_status': 1,
-        'start_date': start_date,
-        'end_date': end_date
-    }
-    respone = requests.post(url, data)
-    if respone.status_code == 400:
-        print('respone', respone)
-
-    result_json = respone.json()
-    first_page_order = build_ysg_order_data(channel, channel_id, result_json, stage)
-    order_list = order_list + first_page_order
-    if len(first_page_order) == 0:
-        return order_list
-
-    total_count = result_json['data'][0]['count']
-    if total_count > max_page_size:
-        for i in range((total_count - 1) // max_page_size + 1):
-            timestamp = int(time.time())
-            str1 = 'channel_id=' + str(
-                channel_id) + '&end_date=' + end_date + '&host_name=' + host_name + '&page=' + str(
-                page) + '&pay_status=1' + '&start_date=' + start_date + '&time=' + str(timestamp) + '&key=' + secert_key
-            sign = md5value(str1).upper()
-            data2 = {
-                'sign': sign,
-                'host_name': host_name,
-                'time': timestamp,
-                'channel_id': channel_id,
-                'page': page,
-                'pay_status': 1,
-                'start_date': start_date,
-                'end_date': end_date
-            }
-            r2 = requests.post(url, data2).json()
-
-            order_list = order_list + build_ysg_order_data(channel, channel_id, r2, stage)
-            page = page + 1
-
-    return order_list
-
-
-def build_ysg_order_data(channel, channel_id, result_json, stage):
-    order_list = ()
-    if 'data' in result_json.keys():
-        data = result_json['data']
-        if len(data) > 0:
-            for x in data:
-                y = {}
-                dtime = datetime.datetime.strptime(x['create_time'][0:10], "%Y-%m-%d")
-                y['date'] = ((int(
-                    time.mktime(dtime.timetuple())) + 8 * 3600) // 86400) * 86400 - 8 * 3600
-                y['order_id'] = x['order_num']
-                y['amount'] = round(int(x['price']) / 100, 2)
-                y['order_time'] = x['create_time']
-                y['channel'] = channel
-                y['from_novel'] = x['book_name']
-                y['stage'] = stage
-                y['user_id'] = x['openid']
-                y['channel_id'] = channel_id
-                y['platform'] = '悠书阁'
-                y['reg_time'] = x['reg_time']
-
-                y = sorted(y.items(), key=lambda item: item[0])
-                y = dict(y)
-                y = tuple(y.values())
-                order_list = order_list + ((y),)
-    return order_list
-
-
-# 数据导入表采用replace替换主键orderid的方法
-def mysql_insert_order(data):
-    if data is None or len(data) == 0:
-        print('数据为空,不执行数据库操作!')
-    else:
-        sql = 'replace into quchen_text.`order_zwg` (amount,channel,channel_id,date,from_novel,order_id,order_time,platform,reg_time,stage,user_id) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
-        connect = MySQLConnection()
-        try:
-            num = connect.batch(sql, data)
-            # 提交
-            connect.commit()
-            print(num, '条订单数据入库成功')
-        except Exception as e:
-            traceback.print_exc()
-            print('订单数据入库失败:', e)
-        finally:
-            connect.close()
-
-
-# 获取各平台的订单数量
-def mysql_select_platform_order_count(date):
-    sql = 'SELECT platform, COUNT(1) AS num FROM quchen_text.`order_zwg` WHERE date = %s GROUP BY platform'
-    connect = MySQLConnection()
-    platform_order_count = []
-    try:
-        platform_order_count = connect.query(sql, date)
-        return platform_order_count
-    except Exception as e:
-        traceback.print_exc()
-        print('各平台的订单数据查询失败:', e)
-    finally:
-        connect.close()
-    return platform_order_count
+from util import db_order_util
+from util import platform_order_api_util
 
 
 def start_all_job():
@@ -681,93 +37,80 @@ def start_all_job():
 
     # st_unix = 1601136000  # 2020/9/27 0:0:0
     # et_unix = 1601308800  # 2020/9/29 0:0:0
-    # et_unix = st_unix + 10  # 2020/9/29 0:0:0
 
     print('查询开始时间:', st_unix, date_util.getSecondsToDatetime(st_unix))
     print('查询结束时间:', et_unix, date_util.getSecondsToDatetime(et_unix))
 
-    order_list = get_yuewen_order(st_unix, et_unix)
-    mysql_insert_order(order_list)
-
-    order_id_list = []
-    for order in order_list:
-        if order[5] == '4200000691202009304870914729':
-            print('存在', order[5])
-        order_id_list.append(order[5])
-
-    print(len(order_id_list))
-    # b = set(order_id_list)
-    # for each_b in b:
-    #     count = 0
-    #     for each_a in order_id_list:
-    #         if each_b == each_a:
-    #             count += 1
-    #     print(each_b, ": ", count)
-
-    exit_flag = True
-    if exit_flag:
-        exit()
+    # order_list = platform_order_api_util.get_yuewen_order(st_unix, et_unix)
+    # db_order_util.batch_save_order(order_list)
 
+    # print(platform_order_api_util.nor())
+    # platform_order_api_util.throw_exception()
+    # print(platform_order_api_util.nor())
+    #
+    # exit_flag = True
+    # if exit_flag:
+    #     exit()  #这里是为了测试,不让代码继续执行
 
-    platform_order_num_list = mysql_select_platform_order_count(date_util.getYesterdayStartTime())
+    platform_order_num_list = db_order_util.get_platform_order_count(date_util.getYesterdayStartTime())
     if len(platform_order_num_list) == 0:
         print('本地库中没有任何数据,现在全平台补全')
-        mysql_insert_order(get_zzy_order(st_unix, et_unix))
-        mysql_insert_order(get_yuewen_order(st_unix, et_unix))
-        mysql_insert_order(get_huasheng_order(st_unix, et_unix))
-        mysql_insert_order(get_ysg_order(st_unix, et_unix))
-        mysql_insert_order(get_zhangdu_order(st_unix, et_unix))
+        db_order_util.batch_save_order(platform_order_api_util.get_zhangzhongyun_order(st_unix, et_unix))
+        db_order_util.batch_save_order(platform_order_api_util.get_yuewen_order(st_unix, et_unix))
+        db_order_util.batch_save_order(platform_order_api_util.get_huasheng_order(st_unix, et_unix))
+        db_order_util.batch_save_order(platform_order_api_util.get_youshuge_order(st_unix, et_unix))
+        db_order_util.batch_save_order(platform_order_api_util.get_zhangdu_order(st_unix, et_unix))
     else:
-        platform_list = ['阅文','悠书阁','掌读','掌中云','花生']
+        platform_list = ['阅文', '悠书阁', '掌读', '掌中云', '花生']
         for platform_order_num in platform_order_num_list:
             platform = str(platform_order_num['platform'])
             num = int(platform_order_num['num'])
             platform_list.remove(platform)
 
             if platform == '阅文':
-                order_list = get_yuewen_order(st_unix, et_unix)
+                order_list = platform_order_api_util.get_yuewen_order(st_unix, et_unix)
                 if len(order_list) != num:
                     print('阅文数据实际订单和已经入库数据差异:', len(order_list) - num)
-                    mysql_insert_order(order_list)
+                    db_order_util.batch_save_order(order_list)
             elif platform == '悠书阁':
-                order_list = get_ysg_order(st_unix, et_unix)
+                order_list = platform_order_api_util.get_youshuge_order(st_unix, et_unix)
                 if len(order_list) != num:
                     print('悠书阁数据实际订单和已经入库数据差异:', len(order_list) - num)
-                    mysql_insert_order(order_list)
+                    db_order_util.batch_save_order(order_list)
             elif platform == '掌读':
-                order_list = get_zhangdu_order(st_unix, et_unix)
+                order_list = platform_order_api_util.get_zhangdu_order(st_unix, et_unix)
                 if len(order_list) != num:
                     print('掌读数据实际订单和已经入库数据差异:', len(order_list) - num)
-                    mysql_insert_order(order_list)
+                    db_order_util.batch_save_order(order_list)
             elif platform == '掌中云':
-                order_list = get_zzy_order(st_unix, et_unix)
+                order_list = platform_order_api_util.get_zhangzhongyun_order(st_unix, et_unix)
                 if len(order_list) != num:
                     print('掌中云数据实际订单和已经入库数据差异:', len(order_list) - num)
-                    mysql_insert_order(order_list)
+                    db_order_util.batch_save_order(order_list)
             elif platform == '花生':
-                order_list = get_huasheng_order(st_unix, et_unix)
+                order_list = platform_order_api_util.get_huasheng_order(st_unix, et_unix)
                 if len(order_list) != num:
                     print('花生数据实际订单和已经入库数据差异:', len(order_list) - num)
-                    mysql_insert_order(order_list)
+                    db_order_util.batch_save_order(order_list)
             else:
                 print('发现未知平台数据!', platform_order_num)
 
         for platform in platform_list:
             if platform == '阅文':
                 print('阅文没有数据')
-                mysql_insert_order(get_yuewen_order(st_unix, et_unix))
+                db_order_util.batch_save_order(platform_order_api_util.get_yuewen_order(st_unix, et_unix))
             elif platform == '悠书阁':
                 print('悠书阁没有数据')
-                mysql_insert_order(get_ysg_order(st_unix, et_unix))
+                db_order_util.batch_save_order(platform_order_api_util.get_youshuge_order(st_unix, et_unix))
             elif platform == '掌读':
                 print('掌读没有数据')
-                mysql_insert_order(get_zhangdu_order(st_unix, et_unix))
+                db_order_util.batch_save_order(platform_order_api_util.get_zhangdu_order(st_unix, et_unix))
             elif platform == '掌中云':
                 print('掌中云没有数据')
-                mysql_insert_order(get_zzy_order(st_unix, et_unix))
+                db_order_util.batch_save_order(platform_order_api_util.get_zhangzhongyun_order(st_unix, et_unix))
             elif platform == '花生':
                 print('花生没有数据')
-                mysql_insert_order(get_huasheng_order(st_unix, et_unix))
+                db_order_util.batch_save_order(platform_order_api_util.get_huasheng_order(st_unix, et_unix))
             else:
                 print('什么鬼平台:', platform)
 

+ 1 - 1
dgp/tests/demo/test.py

@@ -24,7 +24,7 @@ __title__ = '测试类'
                   ┗┻┛  ┗┻┛
 """
 
-from LoggerService import LoggerService
+from util.LoggerService import LoggerService
 # import account_list as al
 from util import date_util
 

+ 1 - 0
dgp/tests/demo/test_pool.py

@@ -23,6 +23,7 @@ __title__ = '线程池测试'
                   ┃┫┫  ┃┫┫
                   ┗┻┛  ┗┻┛
 """
+
 import os
 import random
 import time

+ 0 - 0
dgp/tests/LoggerService.py → dgp/tests/util/LoggerService.py


+ 2 - 2
dgp/tests/MySQLConnection.py → dgp/tests/util/MySQLConnection.py

@@ -42,8 +42,8 @@ class MySQLConnection():
 
     def __init__(self):
         # 获取当前文件所在目录的上一级目录
-        parent_dir_path = os.path.dirname(os.path.abspath('.'))
-        db_config_path = parent_dir_path + '/tests/conf/db_config.ini'
+        parent_dir_path = os.path.dirname(os.path.abspath('..'))
+        db_config_path = parent_dir_path + '/dgp/tests/conf/db_config.ini'
         # print('数据库配置文件:', db_config_path)
 
         # 读取数据库配置信息

+ 0 - 0
dgp/tests/QueryType.py → dgp/tests/util/QueryType.py


+ 1 - 0
dgp/tests/util/date_util.py

@@ -264,6 +264,7 @@ def getCurrentWeekTime():
     timestamps = time.mktime(datetime.date.fromtimestamp(times).timetuple())
     return str(round(timestamps))
 
+
 def checkInterval(start_time, end_time, need_compare_time):
     """
     description:  判断 need_compare_time 是否在 start_time 和 end_time区间里

+ 60 - 0
dgp/tests/util/db_order_util.py

@@ -0,0 +1,60 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+__title__ = '订单工具类,用于查询,修改订单等'
+
+@Time    : 2020/9/30 12:38
+@Author  : Kenny-PC
+@Software: PyCharm
+
+# code is far away from bugs with the god animal protecting
+    I love animals. They taste delicious.
+              ┏┓      ┏┓
+            ┏┛┻━━━┛┻┓
+            ┃      ☃      ┃
+            ┃  ┳┛  ┗┳  ┃
+            ┃      ┻      ┃
+            ┗━┓      ┏━┛
+                ┃      ┗━━━┓
+                ┃  神兽保佑    ┣┓
+                ┃ 永无BUG!   ┏┛
+                ┗┓┓┏━┳┓┏┛
+                  ┃┫┫  ┃┫┫
+                  ┗┻┛  ┗┻┛
+"""
+
+from util.MySQLConnection import MySQLConnection
+
+
+# 数据导入表采用replace替换主键orderid的方法
+def batch_save_order(data):
+    if data is None or len(data) == 0:
+        print('数据为空,不执行数据库操作!')
+    else:
+        sql = 'replace into quchen_text.`order_zwg` (amount,channel,channel_id,date,from_novel,order_id,order_time,platform,reg_time,stage,user_id) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
+        connect = MySQLConnection()
+        try:
+            num = connect.batch(sql, data)
+            # 提交
+            connect.commit()
+            print(num, '条订单数据入库成功')
+        except Exception as e:
+            print('订单数据入库失败:', e)
+        finally:
+            connect.close()
+
+
+# 获取各平台的订单数量
+def get_platform_order_count(date):
+    sql = 'SELECT platform, COUNT(1) AS num FROM quchen_text.`order_zwg` WHERE date = %s GROUP BY platform'
+    connect = MySQLConnection()
+    platform_order_count = []
+    try:
+        platform_order_count = connect.query(sql, date)
+        return platform_order_count
+    except Exception as e:
+        print('各平台的订单数据查询失败:', e)
+    finally:
+        connect.close()
+    return platform_order_count

+ 1 - 4
dgp/tests/util/platform_util.py → dgp/tests/util/platform_config_util.py

@@ -2,7 +2,7 @@
 # -*- coding: utf-8 -*-
 
 """
-__title__ = '平台操作工具类'
+__title__ = '平台配置操作工具类'
 
 @Time    : 2020/9/26 21:51
 @Author  : Kenny-PC
@@ -93,9 +93,6 @@ def get_account_list(platform, account_file_name):
     return account_list
 
 
-# get_zhangzhongyun_account_list()
-
-
 def get_zhangzhongyun_format_time(st_unix):
     """
     description:  掌中云的时间格式比较特殊,需要转换下

+ 653 - 0
dgp/tests/util/platform_order_api_util.py

@@ -0,0 +1,653 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+__title__ = '各个平台的订单API接口'
+
+@Time    : 2020/9/30 12:33
+@Author  : Kenny-PC
+@Software: PyCharm
+
+# code is far away from bugs with the god animal protecting
+    I love animals. They taste delicious.
+              ┏┓      ┏┓
+            ┏┛┻━━━┛┻┓
+            ┃      ☃      ┃
+            ┃  ┳┛  ┗┳  ┃
+            ┃      ┻      ┃
+            ┗━┓      ┏━┛
+                ┃      ┗━━━┓
+                ┃  神兽保佑    ┣┓
+                ┃ 永无BUG!   ┏┛
+                ┗┓┓┏━┳┓┏┛
+                  ┃┫┫  ┃┫┫
+                  ┗┻┛  ┗┻┛
+"""
+
+import datetime
+import hashlib
+import math
+import time
+from concurrent.futures import ProcessPoolExecutor
+from urllib import parse
+
+import requests
+
+import account_list_zwg as al
+from util import date_util
+from util import platform_config_util
+from util import robust_util
+
+
+# md5加密,使用utf-8编码
+def md5(s):
+    md5 = hashlib.md5()
+    md5.update(s.encode("utf-8"))
+    return md5.hexdigest()
+
+
+# 阅文
+@robust_util.catch_exception
+def get_yuewen_order(st, et):
+    start_exec_seconds = date_util.getCurrentSecondTime()
+    total_order_list = ()
+    account_list = platform_config_util.get_yuewen_account_list()
+
+    executor = ProcessPoolExecutor(max_workers=5)
+
+    futures = []
+    for account in account_list:
+        future = executor.submit(get_yuewen_order_task, st, et, account)
+        futures.append(future)
+
+    executor.shutdown(True)
+
+    for future in futures:
+        order_list = future.result()
+        if len(order_list) > 0:
+            total_order_list = order_list + total_order_list
+
+    print('阅文订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
+    return total_order_list
+
+
+def get_yuewen_order_task(st, et, account):
+    order_list = ()
+
+    email = account[0]
+    appsecert = account[1]
+
+    url = 'https://open.yuewen.com/cpapi/wxRecharge/querychargelog'
+    version = 1
+    order_status = 2  # 已支付
+    page_count = 100  # 每页100条数据
+    start_time = st
+
+    for i in range((et - st) // 86400 + 1):
+        page = 1
+        last_min_id = ''
+        last_max_id = ''
+        total_count = ''
+        last_page = ''
+
+        while True:
+            if start_time == et:
+                break
+
+            end_time = min(start_time + 86400, et)
+            timestamp = int(time.time())
+
+            params = {
+                'email': email,
+                'version': version,
+                'timestamp': timestamp,
+                'start_time': start_time,
+                'end_time': end_time,
+                'page': page,
+                'order_status': order_status
+            }
+
+            if page > 1:
+                params['last_min_id'] = last_min_id
+                params['last_max_id'] = last_max_id
+                params['total_count'] = total_count
+                params['last_page'] = last_page
+
+            sorted_data = sorted(params.items())
+            str_params = ''
+            for k, v in sorted_data:
+                str_params = str_params + str(k) + str(v)
+
+            sign = md5(appsecert + str_params).upper()
+
+            # 放入签名
+            params['sign'] = sign
+            response_result_json = requests.get(url=url, params=params).json()
+
+            code = response_result_json['code']
+            ## 此接口有调用频率限制,相同查询条件每分钟仅能请求一次
+            if code != 0:
+                print('阅文查询充值接口异常:', response_result_json, '参数', params)
+                break
+                # if code == 10408:
+                #     if fail_count > 0:
+                #         break
+                #
+                #     sleep_seconds = random.randint(60, 70)
+                #     print('阅文获取订单数据线程休眠【{sleep_seconds}】秒,因为该接口有一分钟的限制'.format(sleep_seconds=sleep_seconds))
+                #     time.sleep(sleep_seconds)
+                #
+                #     print('重试一次')
+                #     fail_count = fail_count + 1
+                #     get_yuewen_order_task(st, et, account, fail_count)
+
+            response_data = response_result_json['data']
+            total_count = response_data['total_count']
+
+            if total_count == 0:
+                continue
+
+            last_min_id = response_data['min_id']
+            last_max_id = response_data['max_id']
+            last_page = response_data['page']
+            order_item_list = response_data['list']
+
+            for order_item in order_item_list:
+                order_time = order_item['order_time']
+                dtime = datetime.datetime.strptime(order_time, "%Y-%m-%d %H:%M:%S")
+                order_time_unix = int(time.mktime(dtime.timetuple()))
+                order_id = order_item['order_id']
+                if date_util.checkInterval(start_time, end_time, order_time_unix) == False:
+                    print('阅文账号【{key}】, 查询时间【{start_time} - {end_time}】,有不符合该时间范围的订单,订单Id【{order_id}】的时间为【{order_time}】'
+                          .format(key=email, start_time=date_util.getSecondsToDatetime(start_time),
+                                  end_time=date_util.getSecondsToDatetime(end_time), order_id=order_id,
+                                  order_time=order_time))
+                    continue
+
+                order = {}
+                order['date'] = ((order_time_unix + 8 * 3600) // 86400) * 86400 - 8 * 3600
+                order['platform'] = '阅文'
+                order['channel'] = order_item['app_name']
+                order['from_novel'] = order_item['book_name']
+                order['user_id'] = order_item['openid']
+                order['stage'] = ''
+                order['channel_id'] = 0
+                order['order_time'] = order_time
+                order['amount'] = order_item['amount']
+                order['reg_time'] = order_item['reg_time']
+                order['order_id'] = order_id
+
+                order = sorted(order.items(), key=lambda item: item[0])
+                order = dict(order)
+                order = tuple(order.values())
+                order_list = order_list + ((order),)
+
+            # print('阅文账号【{key}】, 查询时间【{start_time} - {end_time}】,当前页【{page}】,本次查询订单数量【{total_count}】'
+            #       .format(key=email, start_time=date_util.getSecondsToDatetime(start_time),
+            #               end_time=date_util.getSecondsToDatetime(end_time),page=page, total_count=total_count))
+
+            if int(page) >= math.ceil(total_count / int(page_count)):
+                break
+
+            page = page + 1
+
+        start_time = start_time + 86400  # 天数加1
+
+        # sleep_seconds = random.randint(60, 70)
+        # print('阅文获取订单数据线程休眠【{sleep_seconds}】秒,因为该接口有一分钟的限制'.format(sleep_seconds=sleep_seconds))
+        # time.sleep(sleep_seconds)
+
+    return order_list
+
+
+# 掌读
+@robust_util.catch_exception
+def get_zhangdu_order(st, et):
+    start_exec_seconds = date_util.getCurrentSecondTime()
+    total_order_list = ()
+    account_list = platform_config_util.get_zhangdu_account_list()
+
+    executor = ProcessPoolExecutor(max_workers=5)
+
+    futures = []
+    for account in account_list:
+        future = executor.submit(get_zhangdu_order_task, st, et, account)
+        futures.append(future)
+    executor.shutdown(True)
+
+    for future in futures:
+        order_list = future.result()
+        if len(order_list) > 0:
+            total_order_list = order_list + total_order_list
+
+    print('掌读订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
+    return total_order_list
+
+
+def get_zhangdu_order_task(st, et, account):
+    order_list = ()
+    url = 'https://api.zhangdu520.com/channel/getorder'
+
+    uid = account[0]
+    appsecert = account[1]
+    channel = account[2]
+    timestamp = int(time.time())
+    sign = md5(str(uid) + '&' + appsecert + '&' + str(timestamp))
+    starttime = st
+    timespace = 90 * 3600 * 24
+    endtime = min(et, st + timespace)
+
+    for x in range((et - st) // timespace + 1):  # 分时段
+        if x > 0:
+            print('掌读跨天数查询:', x)
+
+        params = {
+            'uid': uid,
+            'timestamp': timestamp,
+            'sign': sign,
+            'starttime': starttime,
+            'endtime': endtime
+        }
+        response_result_json = requests.get(url=url, params=params).json()
+        pageCount = response_result_json['data']['pageCount']
+        if pageCount == 0:
+            continue
+
+        for page in range(1, pageCount + 1):  # 分页
+            params = {
+                'uid': uid,
+                'timestamp': timestamp,
+                'sign': sign,
+                'starttime': starttime,
+                'endtime': endtime,
+                'page': page
+            }
+            list2 = requests.get(url=url, params=params).json()
+            if 'data' in list2.keys():
+                for b in list2['data']['list']:
+                    if b['status'] == '1':
+                        c = {}
+                        c['amount'] = b['amount']
+                        c['channel_id'] = uid
+                        c['order_id'] = str(b['orderno'])
+                        c['order_time'] = b['ctime']
+                        c['user_id'] = b['openid']
+                        c['platform'] = '掌读'
+                        c['channel'] = channel
+                        c['reg_time'] = b['regtime']
+                        c['from_novel'] = ''
+                        c['stage'] = ''
+                        c['date'] = ((int(b['ctime']) + 8 * 3600) // 86400) * 86400 - 8 * 3600
+
+                        x = sorted(c.items(), key=lambda item: item[0])
+                        x = dict(x)
+                        x = tuple(x.values())
+                        order_list = order_list + ((x),)
+
+        starttime = starttime + timespace
+        endtime = min(et, starttime + timespace)
+
+    return order_list
+
+
+# 花生
+@robust_util.catch_exception
+def get_huasheng_order(st, et):
+    start_exec_seconds = date_util.getCurrentSecondTime()
+    total_order_list = ()
+    account_list = platform_config_util.get_huasheng_account_list()
+
+    executor = ProcessPoolExecutor(max_workers=5)
+
+    futures = []
+    for account in account_list:
+        url = 'https://vip.rlcps.cn/api/getMerchants'
+        apiKey = str(account[0])
+        apiSecurity = account[1]
+        timestamp = str(int(time.time()))
+        sign = md5(apiKey + timestamp + apiSecurity).upper()
+        params = {
+            'apiKey': apiKey,
+            'apiSecurity': apiSecurity,
+            'timestamp': timestamp,
+            'sign': sign
+        }
+        response_result_json = requests.post(url, params).json()
+
+        if 'data' not in response_result_json.keys():
+            # print('花生账号【{apiKey}】本次请求数据为空,响应报文【{result}】'.format(apiKey=apiKey, result=response_result_json))
+            continue
+
+        for merchant in response_result_json['data']:
+            future = executor.submit(get_huasheng_order_task, st, et, account, merchant)
+            futures.append(future)
+
+    executor.shutdown(True)
+
+    for future in futures:
+        order_list = future.result()
+        if len(order_list) > 0:
+            total_order_list = order_list + total_order_list
+
+    print('花生订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
+    return total_order_list
+
+
+def get_huasheng_order_task(st, et, account, merchant):
+    order_list = ()
+
+    apiKey = str(account[0])
+    apiSecurity = account[1]
+    stage = account[2]
+    timestamp = str(int(time.time()))
+
+    order_url = 'https://vip.rlcps.cn/api/orderList'
+    merchant_id = merchant['merchant_id']
+    merchant_name = merchant['merchant_name']
+    start_time = st
+    limit = 500
+
+    for i in range((et - st) // 86400 + 1):
+        page = 1
+
+        while True:
+            date = time.strftime("%Y-%m-%d", time.localtime(start_time))
+            sign = md5(apiKey + date + str(merchant_id) + timestamp + apiSecurity).upper()
+            order_params = {
+                'apiKey': apiKey,
+                'apiSecurity': apiSecurity,
+                'timestamp': timestamp,
+                'date': date,
+                'merchant_id': merchant_id,
+                'sign': sign,
+                'page': page,
+                'limit': limit
+            }
+            response_result_json = requests.post(order_url, order_params).json()
+
+            if 'data' not in response_result_json.keys() or len(response_result_json['data']) == 0:
+                # print('花生账号【{key}】, 渠道【{merchant_id}:{merchant_name}】本次请求数据为空,响应报文【{result}】'
+                #       .format(key=apiKey, merchant_id=merchant_id, merchant_name=merchant_name,
+                #               result=response_result_json))
+                break
+
+            total_count = response_result_json['count']
+            order_item_list = response_result_json['data']
+
+            for order_item in order_item_list:
+                if order_item['order_status'] == 1:  # 1为已支付
+                    order = {}
+                    ##dtime = datetime.datetime.strptime(order_item['pay_at'],"%Y-%m-%d")
+                    ##order['date']= ((int(time.mktime(dtime.timetuple()))+8*3600)//86400)*86400-8*3600
+                    order['user_id'] = order_item['openid']
+                    order['order_id'] = order_item['trans_id']
+                    order['order_time'] = order_item['pay_at']
+                    order['reg_time'] = order_item['join_at']
+                    # TODO 花生的时间需要统一
+                    order['date'] = (start_time + 8 * 3600) // 86400 * 86400 - 8 * 3600
+                    order['channel'] = merchant_name
+                    order['channel_id'] = merchant_id
+                    order['platform'] = '花生'
+                    order['stage'] = stage
+                    order['from_novel'] = order_item['book_name']
+                    order['amount'] = order_item['amount']
+
+                    order = sorted(order.items(), key=lambda item: item[0])
+                    order = dict(order)
+                    order = tuple(order.values())
+                    order_list = order_list + ((order),)
+
+            if int(page) >= math.ceil(total_count / int(limit)):
+                break
+
+            # print('花生账号【{key}】, 渠道【{merchant_id}:{merchant_name}】当前页【{page}】,本次查询订单数【{total_count}】,即将查询下一页'
+            #       .format(key=apiKey, merchant_id=merchant_id, merchant_name=merchant_name, page=page, total_count=total_count))
+
+            page = page + 1
+
+        start_time = start_time + 86400  # 天数加1
+
+    return order_list
+
+
+# 掌中云
+@robust_util.catch_exception
+def get_zhangzhongyun_order(st, et):
+    start_exec_seconds = date_util.getCurrentSecondTime()
+    total_order_list = ()
+    account_list = al.zzy_account_list
+    # account_list = platform_util.get_zhangzhongyun_account_list()
+    # account_list = [['1108701f1d6','0f9c0f8429d1a16a8a78c2306e7a4db3','清勇7月']]
+    # account_list = [['1109295d56c','9bb955186597882ac473e86ba4576158','趣程20期']]
+
+    executor = ProcessPoolExecutor(max_workers=5)
+
+    futures = []
+    for account in account_list:
+        url = 'https://openapi.818tu.com/partners/channel/channels/list?'
+        key = account[0]
+        secert = account[1]
+        sign = md5(secert + 'key=' + key)
+        params = 'key=' + key + '&sign=' + sign
+        response_result_json = requests.get(url + params).json()  # 获取子渠道列表
+
+        if 'data' not in response_result_json.keys():
+            # print('掌中云账号【{key}】本次请求数据为空,响应报文【{result}】'.format(key=key, result=response_result_json))
+            continue
+
+        items = response_result_json['data']['items']
+        for channel in items:
+            # 获取channel_id 后逐个拉取历史orders
+            future = executor.submit(get_zhangzhongyun_order_task, st, et, account, channel)
+            futures.append(future)
+
+    executor.shutdown(True)
+
+    for future in futures:
+        order_list = future.result()
+        if len(order_list) > 0:
+            total_order_list = order_list + total_order_list
+
+    print('掌中云订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
+    return total_order_list
+
+
+def get_zhangzhongyun_order_task(st, et, account, channel):
+    # 掌中云的时间格式比较特殊,转换下
+    st = platform_config_util.get_zhangzhongyun_format_time(st)
+    et = platform_config_util.get_zhangzhongyun_format_time(et)
+
+    order_list = ()
+
+    key = account[0]
+    secert = account[1]
+    stage = account[2]
+
+    order_url = 'https://openapi.818tu.com/partners/channel/orders/list?'
+    channel_id = channel['id']
+    channel_name = channel['nickname']
+    status = str(1)
+    page = str(1)
+    per_page = str(1000)
+    get_time = st
+    limit_time = et
+    gte = parse.urlencode({'created_at[gte]': get_time})  # gte就是ge 大于等于开始时间
+    lt = parse.urlencode({'created_at[lt]': limit_time})  # 小于 结束时间
+
+    while True:
+        sign = md5(secert + 'channel_id=' + str(
+            channel_id) + '&created_at[gte]=' + get_time + '&created_at[lt]=' + limit_time + '&key=' + key + '&page=' + str(
+            page) + '&per_page=' + per_page + '&status=' + status)
+        params = 'channel_id=' + str(channel_id) + '&' + gte + '&' + lt + '&page=' + str(
+            page) + '&per_page=' + per_page + '&status=' + status + '&key=' + key + '&sign=' + sign
+
+        response_result_json = requests.get(order_url + params).json()
+
+        if 'data' not in response_result_json.keys():
+            # print('掌中云账号【{key}】, 渠道【{channel_id}:{channel_name}】本次请求数据为空,响应报文【{result}】'
+            #       .format(key=key, channel_id=channel_id, channel_name=channel_name, result=response_result_json))
+            break
+
+        total_count = response_result_json['data']['count']  # 总数量
+        order_item_list = response_result_json['data']['items']  # 订单列表
+
+        for order_item in order_item_list:
+            order = {}
+            order['user_id'] = str(order_item['member']['openid'])
+            order['channel'] = channel_name
+            order['reg_time'] = order_item['member']['created_at']
+            order['channel_id'] = channel_id
+            order['amount'] = round(order_item['price'] / 100, 2)
+            order['order_id'] = str(order_item['id'])
+            order['order_time'] = order_item['created_at']
+            order['platform'] = '掌中云'
+            order['stage'] = stage
+            dtime = datetime.datetime.strptime(order_item['created_at'][0:10], "%Y-%m-%d")
+            order['date'] = ((int(time.mktime(dtime.timetuple())) + 8 * 3600) // 86400) * 86400 - 8 * 3600
+
+            if str(order_item['from_novel_id']) != 'None':
+                order['from_novel'] = order_item['from_novel']['title']
+            else:
+                order['from_novel'] = 'None'
+
+            x = sorted(order.items(), key=lambda item: item[0])
+            x = dict(x)
+            x = tuple(x.values())
+            order_list = order_list + ((x),)
+
+        if int(page) >= math.ceil(total_count / int(per_page)):
+            break
+
+        # print('掌中云账号【{key}】, 渠道【{channel_id}:{channel_name}】当前页【{page}】,本次查询订单数【{total_count}】,即将查询下一页'
+        #       .format(key=key, channel_id=channel_id, channel_name=channel_name, page=page, total_count=total_count))
+
+        page = int(page) + 1
+
+    return order_list
+
+
+# 悠书阁
+@robust_util.catch_exception
+def get_youshuge_order(st, et):
+    start_exec_seconds = date_util.getCurrentSecondTime()
+    total_order_list = ()
+    account_list = platform_config_util.get_youshuge_account_list()
+
+    executor = ProcessPoolExecutor(max_workers=5)
+
+    futures = []
+    for account in account_list:
+        future = executor.submit(get_youshuge_order_task, st, et, account)
+        futures.append(future)
+    executor.shutdown(True)
+
+    for future in futures:
+        order_list = future.result()
+        if len(order_list) > 0:
+            total_order_list = order_list + total_order_list
+
+    print('悠书阁订单数量:', len(total_order_list), '执行时长(秒):', date_util.getCurrentSecondTime() - start_exec_seconds)
+    return total_order_list
+
+
+def get_youshuge_order_task(st, et, account):
+    order_list = ()
+    url = 'https://novel.youshuge.com/v2/open/orders'
+    # 超过100条就需要分页,别问我为什么知道,看代码看出来的
+    max_page_size = 100
+
+    host_name = account[0]
+    channel_id = int(account[1])
+    secert_key = account[2]
+    channel = account[3]
+    stage = account[4]
+
+    timestamp = int(time.time())
+    start_date = time.strftime("%Y-%m-%d", time.localtime(st))
+    end_date = time.strftime("%Y-%m-%d", time.localtime(et))
+    page = 1
+    str1 = 'channel_id=' + str(channel_id) + '&end_date=' + end_date + '&host_name=' + host_name + '&page=' + str(
+        page) + '&pay_status=1' + '&start_date=' + start_date + '&time=' + str(timestamp) + '&key=' + secert_key
+    sign = md5(str1).upper()
+    data = {
+        'sign': sign,
+        'host_name': host_name,
+        'time': timestamp,
+        'channel_id': channel_id,
+        'page': page,
+        'pay_status': 1,
+        'start_date': start_date,
+        'end_date': end_date
+    }
+    respone = requests.post(url, data)
+    if respone.status_code == 400:
+        print('respone', respone)
+
+    result_json = respone.json()
+    first_page_order = build_ysg_order_data(channel, channel_id, result_json, stage)
+    order_list = order_list + first_page_order
+    if len(first_page_order) == 0:
+        return order_list
+
+    total_count = result_json['data'][0]['count']
+    if total_count > max_page_size:
+        for i in range((total_count - 1) // max_page_size + 1):
+            timestamp = int(time.time())
+            str1 = 'channel_id=' + str(
+                channel_id) + '&end_date=' + end_date + '&host_name=' + host_name + '&page=' + str(
+                page) + '&pay_status=1' + '&start_date=' + start_date + '&time=' + str(timestamp) + '&key=' + secert_key
+            sign = md5(str1).upper()
+            data2 = {
+                'sign': sign,
+                'host_name': host_name,
+                'time': timestamp,
+                'channel_id': channel_id,
+                'page': page,
+                'pay_status': 1,
+                'start_date': start_date,
+                'end_date': end_date
+            }
+            r2 = requests.post(url, data2).json()
+
+            order_list = order_list + build_ysg_order_data(channel, channel_id, r2, stage)
+            page = page + 1
+
+    return order_list
+
+
+def build_ysg_order_data(channel, channel_id, result_json, stage):
+    order_list = ()
+    if 'data' in result_json.keys():
+        data = result_json['data']
+        if len(data) > 0:
+            for x in data:
+                y = {}
+                dtime = datetime.datetime.strptime(x['create_time'][0:10], "%Y-%m-%d")
+                y['date'] = ((int(
+                    time.mktime(dtime.timetuple())) + 8 * 3600) // 86400) * 86400 - 8 * 3600
+                y['order_id'] = x['order_num']
+                y['amount'] = round(int(x['price']) / 100, 2)
+                y['order_time'] = x['create_time']
+                y['channel'] = channel
+                y['from_novel'] = x['book_name']
+                y['stage'] = stage
+                y['user_id'] = x['openid']
+                y['channel_id'] = channel_id
+                y['platform'] = '悠书阁'
+                y['reg_time'] = x['reg_time']
+
+                y = sorted(y.items(), key=lambda item: item[0])
+                y = dict(y)
+                y = tuple(y.values())
+                order_list = order_list + ((y),)
+    return order_list
+
+
+@robust_util.catch_exception
+def nor():
+    return 1
+
+
+@robust_util.catch_exception
+def throw_exception():
+    return 5 / 0

+ 44 - 0
dgp/tests/util/robust_util.py

@@ -0,0 +1,44 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+__title__ = '鲁棒性工具类'
+
+@Time    : 2020/9/30 14:51
+@Author  : zhengwangeng
+@Software: PyCharm
+
+# code is far away from bugs with the god animal protecting
+    I love animals. They taste delicious.
+              ┏┓      ┏┓
+            ┏┛┻━━━┛┻┓
+            ┃      ☃      ┃
+            ┃  ┳┛  ┗┳  ┃
+            ┃      ┻      ┃
+            ┗━┓      ┏━┛
+                ┃      ┗━━━┓
+                ┃  神兽保佑    ┣┓
+                ┃ 永无BUG!   ┏┛
+                ┗┓┓┏━┳┓┏┛
+                  ┃┫┫  ┃┫┫
+                  ┗┻┛  ┗┻┛
+"""
+
+import sys
+
+import traceback
+
+
+# 异常处理装饰器
+def catch_exception(actual_do):
+    def add_robust(*args, **keyargs):
+        try:
+            return actual_do(*args, **keyargs)
+        except Exception as err:
+            # print('Error execute: %s' % actual_do.__name__)
+            info = sys.exc_info()[2].tb_frame.f_back
+            temp = "exception:filename:{}\tlines:{}\tfuncation:{}\terror:{}"
+            # print(temp.format(info.f_code.co_filename, info.f_lineno, actual_do.__name__, repr(err)))
+            traceback.print_exc()
+
+    return add_robust

+ 0 - 10
todo_list.md

@@ -1,10 +0,0 @@
-# 后续优化点
-
-#### 配置优化,迁移到配置文件中
-* 数据库连接需要单独配置
-* 账号,token配置也需要独立成配置文件,方便实时获取最新的数据,而不需要重启脚本
-* 多线程,异步获取订单数据,目前获取数据都是单线程,太慢了,掌中云2020-09-25的订单数据共12106条,执行时间为529秒,需要特别优化。
-
-
-
-