Pārlūkot izejas kodu

MOD:更改为游戏处理

cxyu 3 gadi atpakaļ
vecāks
revīzija
9a18cba0db

+ 0 - 156
app/api_data/platform_order/QiYue.py

@@ -1,156 +0,0 @@
-"""
-七悦书城数据
-"""
-import time
-from model import ComUtils
-import requests
-import json
-from model.DataBaseUtils import MysqlUtils
-from model.DateUtils import DateUtils
-from model.ComUtils import *
-import math
-from model.DateUtils import DateUtils
-import logging
-from urllib import parse
-from model.DingTalkUtils import DingTalkUtils
-from app.api_data.platform_order.order_util import *
-
-# logging.getLogger().setLevel(logging.WARNING)
-db = MysqlUtils()
-du = DateUtils()
-
-
-# 获取七悦订单数据
-def get_qiyue_order_task(start, end, account):
-    """一分钟请求60次的限制"""
-    order_list = []
-    # 参数
-    order_url = "https://a-api.zhangwenwenhua.com" + "/v1/orders"
-    stage = account[0]
-    token = account[1]
-    size = 50
-    freq = 0
-    for date in du.getDateLists(start, end):
-
-        page = 1
-        while True:
-            timestamp = int(time.time())
-            url = order_url + "?" + "token=" + str(token) + "&timestamp=" + str(timestamp) + "&page=" + str(
-                page) + "&size=" + str(size) + "&date=" + date
-            rsp = requests.get(url=url)
-            response_result_json = rsp.json()
-            # print(response_result_json)
-
-            # 无限制了
-            # freq += 1
-            # if freq == 59:
-            #     print("一分钟请求60次的限制 等待中")
-            #     time.sleep(61)
-            #     freq = 0
-
-            code = response_result_json['code']
-            if code != 0:
-                print(stage, '七悦充值接口异常:', response_result_json)
-                break
-
-            result_data = response_result_json['data']
-
-            total = result_data['total']
-            if total <= 0:
-                break
-
-            order_item_list = result_data['data']
-            for x in order_item_list:
-                create_time = DateUtils.stamp_to_str(x['create_time'], '%Y-%m-%d %H:%M:%S')
-                reg_time = DateUtils.stamp_to_str(x['user_create_time'], '%Y-%m-%d %H:%M:%S')
-
-                order_list.append((
-                    create_time[:10],
-                    stage,
-                    '七悦',
-                    x['wechat_app_name'],  # 公众号名称
-                    x['channel_id'],
-                    x['user_open_id'],
-                    create_time,
-                    reg_time,  # 用户注册时间
-                    x['money'],
-                    x['book_name'],
-                    x['id'],
-                    x['state'],
-                    x['user_id'],
-                    x['wechat_app_id'],
-                    x['book_keywords'],
-                    x['type'],
-                    x['trade_no'],
-                    x['transaction_no']
-                )
-                )
-            next_page_url = result_data['next_page_url']
-            if next_page_url is None:
-                break
-            page += 1
-        # print(len(order_list))
-    print(f'{stage} [{start}-{end}] 有订单{order_list.__len__()}')
-    # print(order_list)
-    if order_list.__len__() > 0:
-        save_order2(order_list)
-
-
-# 获取七悦用户信息
-def get_qiyue_user_info(account, date=None):
-    order_url = "https://a-api.zhangwenwenhua.com" + "/v1/users"
-    stage = account[0]
-    token = account[1]
-    page = 1
-    size = 100
-    freq = 0
-    dt = "&date=" + date if date else ''
-
-    li = []
-    while True:
-        timestamp = int(time.time())
-        url = order_url + "?" + "token=" + str(token) + "&timestamp=" + str(timestamp) + "&page=" + str(
-            page) + "&size=" + str(size) + "&date=" + dt
-        r = requests.get(url=url).json()
-        print(r)
-
-        freq += 1
-        if freq == 59:
-            print("一分钟请求60次的限制 等待中")
-            time.sleep(61)
-            freq = 0
-
-        if r['code'] != 0:
-            print(r)
-            continue
-            # DingTalkUtils.send('七悦用户拉取接口错误'+r['msg'])
-
-        for i in r['data']['data']:
-            li.append(('七悦',
-                       i['id'],
-                       i['openid'],
-                       i['channel_id'],
-                       i['wechat_app_name'],
-                       i['wechat_app_id'],
-                       i['kandian'],
-                       i['free_kandian'],
-                       i['vip_endtime']
-
-                       ))
-
-        if len(r['data']['data']) < size:
-            break
-        page += 1
-    print(f"{stage} 有粉丝{len(li)}")
-    if len(li) > 0:
-        sql = "replace into platform_user_info values (%s,%s,%s,%s,%s,%s,%s,%s,%s)"
-        db.quchen_text.executeMany(sql, li)
-
-
-if __name__ == '__main__':
-    st = '2021-06-07'
-    et = '2021-06-07'
-
-    for account in get_account("七悦"):
-        get_qiyue_order_task(st, et, account)
-        # get_qiyue_user_info(account)

+ 0 - 100
app/api_data/platform_order/audio_qiyue.py

@@ -1,100 +0,0 @@
-import time
-import requests
-from model.DataBaseUtils import MysqlUtils
-from model.DateUtils import DateUtils
-from queue import Queue
-from app.api_data.platform_order.order_util import save_order
-
-
-class AudioQiyue:
-    def __init__(self):
-        self.db_quchen_text = MysqlUtils().quchen_text
-        self.date_action = DateUtils()
-        self.page_size = 50  # 一次请求的量级
-        # 限制一分钟120次请求,设置为本地限制70s 120次请求
-        self.time_sing_size = 120  # 时间队列长度
-        self.time_sing_ms = 70  # 时间队列,第一对于最后的时间差值.
-        # 暂时没有限制
-        self.time_sing_ms = 0.1
-
-    def control_speed(self, time_queue):
-        if time_queue.full():
-            time_s = time_queue.get()
-            differ_time_num = time.time() - time_s
-            if differ_time_num < self.time_sing_ms:
-                print('访问速度过快,进行休眠,{}'.format(self.time_sing_ms - differ_time_num))
-                time.sleep(self.time_sing_ms - differ_time_num)
-            else:
-                time_queue.put(time.time())
-        else:
-            time_queue.put(time.time())
-
-    def get_order(self, start, end, account):
-
-        order_list = []
-        # 参数
-        order_url = "https://a-o-api.qiyuept.com" + "/v1/orders"
-        stage = account[0]
-        token = account[1]
-        time_sign = Queue(self.time_sing_size)
-        for date in self.date_action.getDateLists(start, end):
-            page = 1
-            while True:
-                timestamp = int(time.time())
-                self.control_speed(time_queue=time_sign)
-
-                url = order_url + "?" + "token=" + str(token) + "&timestamp=" + str(timestamp) + "&page=" + str(
-                    page) + "&size=" + str(self.page_size) + "&date=" + date
-                rsp = requests.get(url=url)
-                response_result_json = rsp.json()
-                # print(response_result_json)
-
-                code = response_result_json['code']
-                if code != 0:
-                    print(stage, '七悦充值接口异常:', response_result_json)
-                    break
-
-                result_data = response_result_json['data']
-                total = result_data['total']
-                if total <= 0:
-                    break
-
-                order_item_list = result_data['data']
-                for x in order_item_list:
-                    create_time = DateUtils.stamp_to_str(x['create_time'], '%Y-%m-%d %H:%M:%S')
-                    reg_time = DateUtils.stamp_to_str(x['user_create_time'], '%Y-%m-%d %H:%M:%S')
-                    order_list.append((
-                        create_time[:10],
-                        stage,
-                        '七悦有声',
-                        x['wechat_app_name'],  # 公众号名称
-                        x['channel_id'],
-                        x['user_open_id'],
-                        create_time,
-                        reg_time,  # 用户注册时间
-                        x['money'],
-                        x['book_name'],
-                        x['transaction_no'] if x['transaction_no'] != '0' else x['trade_no'],  # 订单id
-                        x['state']
-                        # , x['user_id']
-                    )
-                    )
-                next_page_url = result_data['next_page_url']
-                if next_page_url is None:
-                    break
-                page += 1
-            # print(len(order_list))
-
-        print(f'{stage} [{start}~{end}] 有订单{order_list.__len__()}')
-        if order_list.__len__() > 0:
-            # print(order_list)
-            save_order(order_list)
-
-
-if __name__ == "__main__":
-    account = ['趣程15期',
-               'eyJpdiI6Ilc0dmJWTjlHZnpJVVUwM3Q3dlc2aWc9PSIsInZhbHVlIjoiNFFvbXJISzBoTExoa0NJMmtXd0FMUT09IiwibWFjIjoiNTY1YjA3MTVlMzliYzg2MzcxMjZjOTRkYTMyY2FlZmJmNDUyZjYyZGEzM2I4MTMxNDNhMTIwNTIzZWViZjMyMSJ9']
-    start = '2021-04-01'
-    end = '2021-04-22'
-    # AudioQiyue().get_order(start=start, end=end, account=account)
-    AudioQiyue().run(start=start, end=end)

+ 0 - 230
app/api_data/platform_order/get_order.py

@@ -1,230 +0,0 @@
-from app.api_data.platform_order.order_util import *
-from model.DataBaseUtils import MysqlUtils
-from concurrent.futures import ThreadPoolExecutor
-from model.DingTalkUtils import DingTalkDecorators
-from app.api_data.platform_order.audio_qiyue import AudioQiyue
-from app.api_data.platform_order.youshuge import get_youshuge_order_task
-from app.api_data.platform_order.yuewen import get_yuewen_order_task
-from app.api_data.platform_order.yuewen_fast_app import get_yuewen_fast_app_order_task
-from app.api_data.platform_order.yangguang import yangguang
-from app.api_data.platform_order.QiYue import get_qiyue_order_task
-from app.api_data.platform_order.wandu import get_wandu_order, get_all_channel
-import logging
-
-db = MysqlUtils()
-
-
-def get_new_account(plactform):
-    data = db.quchen_text.getData(
-        f"select text from order_account_text where platform='{plactform}' and create_time>='{du.get_n_days(-1)}'")
-    new_data = []
-    for i in data:
-        new_data.append(i[0].replace('\n', '').split(","))
-    return new_data
-
-
-@DingTalkDecorators("花生")
-def huasheng(start=None, end=None):
-    logging.info('花生订单数据拉取,开始')
-    if start is None:
-        start = end = du.getNow()
-    accounts = get_account("花生")
-    if len(accounts) == 0:
-        return
-    else:
-        logging.info(f'花生有账号{len(accounts)}个')
-    # 花生有请求限制 不用多线程
-    for account in accounts:
-        get_hs_order_task(start, end, account)
-    logging.info('花生订单数据拉取,结束')
-
-
-@DingTalkDecorators("七悦")
-def qiyue(start=None, end=None, new=None):
-    logging.info('七悦订单数据拉取,开始')
-    if start is None:
-        start = end = du.getNow()
-    accounts = get_account("七悦")
-    if len(accounts) == 0:
-        return
-    else:
-        logging.info(f'七悦有账号{len(accounts)}个')
-    for account in accounts:
-        get_qiyue_order_task(start, end, account)
-    logging.info('七悦订单数据拉取,结束')
-
-
-@DingTalkDecorators("万读")
-def wandu(start=None, end=None, new=None):
-    logging.info('万读订单数据拉取,开始')
-    if start is None:
-        start = end = du.getNow()
-    accounts = get_account("万读")
-    if len(accounts) == 0:
-        return
-    else:
-        logging.info(f'万读有账号{len(accounts)}个')
-    channel_info = get_all_channel()
-    for account in accounts:
-        get_wandu_order(start, end, account, channel_info)
-    logging.info('万读订单数据拉取,结束')
-
-
-@DingTalkDecorators("文鼎")
-def wending(start=None, end=None, new=None):
-    logging.info('文鼎订单数据拉取,开始')
-    if start is None:
-        start = end = du.getNow()
-
-    accounts = get_account("文鼎") if new is None else get_new_account('文鼎')
-    if len(accounts) == 0:
-        return
-    else:
-        logging.info(f'文鼎有账号{len(accounts)}个')
-    for account in accounts:
-        get_wd_order_task(start, end, account)
-    logging.info('文鼎订单数据拉取,结束')
-
-
-@DingTalkDecorators("国风")
-def guofeng(start=None, end=None, new=None):
-    if start:
-        start = start if start > '2021-07-09' else '2021-07-09'
-    logging.info('国风订单数据拉取,开始')
-    if start is None:
-        start = end = du.getNow()
-
-    accounts = get_account("国风") if new is None else get_new_account('国风')
-    if len(accounts) == 0:
-        return
-    else:
-        logging.info(f'国风有账号{len(accounts)}个')
-    for account in accounts:
-        get_gf_order_task(start, end, account)
-    logging.info('国风订单数据拉取,结束')
-
-
-@DingTalkDecorators("掌读")
-def zhangdu(start=None, end=None, new=None):
-    logging.info('掌读订单数据拉取,开始')
-    if start is None:
-        start = end = du.getNow()
-
-    accounts = get_account("掌读")
-    if len(accounts) == 0:
-        return
-    else:
-        logging.info(f'掌读有账号{len(accounts)}个')
-    for account in accounts:
-        get_zd_order_task(start, end, account)
-    logging.info('掌读订单数据拉取,结束')
-
-
-@DingTalkDecorators("掌中云")
-def zhangzhongyun(start=None, end=None, new=None):
-    logging.info('掌中云订单数据拉取,开始')
-    if start is None:
-        start = du.getNow()
-        end = du.get_n_days(1)
-
-    executor = ThreadPoolExecutor(max_workers=5)
-    accounts = get_account("掌中云")
-    if len(accounts) == 0:
-        return
-    else:
-        logging.info(f'掌中云有账号{len(accounts)}个')
-    for account in accounts:
-        executor.submit(
-            get_zzy_order_task, start, end, account)
-    executor.shutdown()
-    logging.info('掌中云订单数据拉取,结束')
-
-
-@DingTalkDecorators("阅文")
-def yueweng(start=None, end=None):
-    logging.info('阅文订单数据拉取,开始')
-    if start is None:
-        start = end = du.getNow()
-
-    executor = ThreadPoolExecutor(max_workers=5)
-    accounts = get_account("阅文")
-    if len(accounts) == 0:
-        return
-    else:
-        logging.info(f'阅文有账号{len(accounts)}个')
-    for account in accounts:
-        executor.submit(get_yuewen_order_task, start, end, account)
-    executor.shutdown()
-    logging.info('阅文订单数据拉取,结束')
-
-
-@DingTalkDecorators("阅文快应用")
-def yueweng_fastapp(start=None, end=None):
-    logging.info('阅文快应用订单数据拉取,开始')
-    if start is None:
-        start = end = du.getNow()
-
-    executor = ThreadPoolExecutor(max_workers=5)
-    accounts = get_account("阅文快应用")
-    if len(accounts) == 0:
-        return
-    else:
-        logging.info(f'阅文快应用有账号{len(accounts)}个')
-    for account in accounts:
-        executor.submit(get_yuewen_fast_app_order_task, start, end, account)
-    executor.shutdown()
-    logging.info('阅文快应用订单数据拉取,结束')
-
-
-@DingTalkDecorators("七悦有声")
-def qiyueyousheng(start=None, end=None):
-    logging.info('七悦有声订单数据拉取,开始')
-    if start is None:
-        start = end = du.getNow()
-    accounts = get_account("七悦有声")
-    if len(accounts) == 0:
-        return
-    else:
-        logging.info(f'七悦有声有账号{len(accounts)}个')
-    for account in accounts:
-        AudioQiyue().get_order(start, end, account)
-    logging.info('七悦有声订单数据拉取,结束')
-
-
-@DingTalkDecorators("悠书阁")
-def youshuge(start=None, end=None):
-    logging.info('悠书阁订单数据拉取,开始')
-    if start is None:
-        start = end = du.getNow()
-
-    executor = ThreadPoolExecutor(max_workers=5)
-    accounts = get_account("悠书阁")
-    if len(accounts) == 0:
-        return
-    else:
-        logging.info(f'悠书阁有账号{len(accounts)}个')
-    for account in accounts:
-        executor.submit(get_youshuge_order_task, start, end, account)
-    executor.shutdown()
-    logging.info('悠书阁订单数据拉取,结束')
-
-
-def yestoday():
-    st = et = du.get_n_days(-1)
-    huasheng(st, et)
-    qiyue(st, et)
-    qiyueyousheng(st, et)
-    wending(st, et)
-    zhangdu(st, et)
-    zhangzhongyun(st, et)
-    yueweng(st, et)
-    youshuge(st, et)
-    yangguang(st, et)
-
-
-if __name__ == '__main__':
-    # zhangzhongyun()
-    # yangguang()
-    # huasheng("2021-04-12",'2021-05-10')
-    yueweng(du.get_n_days(-3), du.get_n_days(0))
-    # qiyue('2020-11-01', '2021-06-03')

+ 0 - 539
app/api_data/platform_order/order_util.py

@@ -1,539 +0,0 @@
-import time
-from model import ComUtils
-import requests
-import json
-from model.DataBaseUtils import MysqlUtils
-from model.DateUtils import DateUtils
-from model.ComUtils import *
-import math
-from model.DateUtils import DateUtils
-import logging
-from urllib import parse
-from model.DingTalkUtils import DingTalkUtils
-
-# logging.getLogger().setLevel(logging.WARNING)
-db = MysqlUtils()
-du = DateUtils()
-
-
-def get_hs_order_task(start, end, account):
-    url = 'https://vip.rlcps.cn/api/getMerchants'
-    apiKey = str(account[0])
-    apiSecurity = account[1]
-    timestamp = str(int(time.time()))
-    sign = md5(apiKey + timestamp + apiSecurity).upper()
-    params = {
-        'apiKey': apiKey,
-        'apiSecurity': apiSecurity,
-        'timestamp': timestamp,
-        'sign': sign
-    }
-    response_result_json = requests.post(url, params).json()
-    if 'data' not in response_result_json.keys():
-        print('花生账号【{apiKey}】本次请求数据异常,响应报文【{result}】'.format(apiKey=apiKey, result=response_result_json))
-
-    channel_data = response_result_json['data']
-
-    # print(f"{account[2]} 有channel{len(channel_data)}个:{str([i['merchant_name'] for i in channel_data])}")
-
-    li = []
-    for merchant in channel_data:
-        orders = get_huasheng_order(start, end, account, merchant)
-        li.extend(orders)
-
-    if len(li) > 0:
-        print(f"花生账号:{account[2]} 有order{len(li)}个")
-        # print(li)
-        save_order(li)
-
-
-def get_huasheng_order(start, end, account, merchant):
-    li = []
-    apiKey = str(account[0])
-    apiSecurity = account[1]
-    stage = account[2]
-    timestamp = str(int(time.time()))
-
-    order_url = 'https://vip.rlcps.cn/api/orderList'
-    merchant_id = merchant['merchant_id']
-    merchant_name = merchant['merchant_name']
-    limit = 500
-
-    for date in du.getDateLists(start, end):
-        page = 1
-        while True:
-            sign = md5(apiKey + date + str(merchant_id) + timestamp + apiSecurity).upper()
-            order_params = {
-                'apiKey': apiKey,
-                'apiSecurity': apiSecurity,
-                'timestamp': timestamp,
-                'date': date,
-                'merchant_id': merchant_id,
-                'sign': sign,
-                'page': page,
-                'limit': limit
-            }
-            r = requests.post(order_url, order_params)
-            response_result_json = r.json()
-            if response_result_json['code'] != 0:
-                print(response_result_json)
-                DingTalkUtils().send('花生订单接口异常' + r.text)
-
-            if 'data' not in response_result_json.keys():
-                print('花生账号【{key}】, 查询时间【{date}】, 渠道【{merchant_id}:{merchant_name}】本次请求数据异常,响应报文【{result}】'
-                      .format(key=apiKey, date=date, merchant_id=merchant_id, merchant_name=merchant_name,
-                              result=response_result_json))
-                break
-
-            if len(response_result_json['data']) == 0:
-                break
-
-            order_item_list = response_result_json['data']
-            if len(order_item_list) == 0:
-                break
-            for i in order_item_list:
-                li.append(
-                    (i['request_at'][:10],
-                     stage,
-                     '花生',
-                     merchant_name,
-                     merchant_id,
-                     i['openid'],
-                     i['request_at'],
-                     i['join_at'],
-                     i['amount'],
-                     i['book_name'],
-                     i['trans_id'] if i['trans_id'] != '' else i['order_num'],
-                     2 if i['order_status'] == 1 else 1
-                     # i['user_id']
-
-                     )
-                )
-
-            if len(order_item_list) < limit:
-                break
-            else:
-                page = page + 1
-    return li
-
-
-def save_hs_data(data):
-    sql = 'replace INTO quchen_text.ods_order ' \
-          ' VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
-    db.quchen_text.executeMany(sql, data)
-
-    # print(order_list)
-
-
-def save_order(order_list):
-    db.quchen_text.executeMany("""replace into ods_order(date,stage,platform,channel,channel_id,user_id,
-                               order_time,reg_time,amount,from_novel,order_id,status) 
-                               values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""", order_list)
-    print("入库成功")
-
-
-def save_order2(order_list):
-    db.quchen_text.executeMany("""replace into ods_order(date,stage,platform,channel,channel_id,user_id,
-                               order_time,reg_time,amount,from_novel,order_id,status,
-                               platform_user_id,wechat_app_id,book_tags,order_type,trade_no,transaction_no) 
-                               values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""", order_list)
-    print("入库成功")
-
-
-def get_wd_account_siteid_list(account):
-    url = 'https://bi.reading.163.com/dist-api/siteList'
-
-    consumerkey = account[0]
-    secretkey = account[1]
-    stage = account[3]
-    timestamp = int(time.time() * 1000)
-
-    siteid_params = {
-        "consumerkey": consumerkey,
-        'secretkey': secretkey,
-        'timestamp': timestamp,
-    }
-    sorted_data = sorted(siteid_params.items(), reverse=False)
-    s = ""
-    for k, v in sorted_data:
-        s = s + str(k) + "=" + str(v)
-    sign = md5(s).lower()
-    siteid_params['sign'] = sign
-
-    consumerkey = siteid_params['consumerkey']
-    timestamp = siteid_params['timestamp']
-    parameter = 'consumerkey=' + str(consumerkey) + '&timestamp=' + str(timestamp) + '&sign=' + str(sign)
-    get_url = url + "?" + parameter
-
-    while True:
-        r = requests.get(url=get_url)
-        if r.status_code == 200:
-            break
-
-    try:
-        id_key_list = r.json()['data']
-    except:
-        return []
-    mpid_list = []
-    try:
-        for id_key_val in id_key_list:
-            mpid = dict(id_key_val)["mpId"]
-            mpid_list.append(mpid)
-    except Exception as e:
-        print(stage, '站点查询返回结果:', r.json())
-    return mpid_list
-
-
-def get_wending_json_object(url, params):
-    params['timestamp'] = int(time.time() * 1000)
-    sorted_data = sorted(params.items(), reverse=False)
-    s = ""
-    for k, v in sorted_data:
-        s = s + str(k) + "=" + str(v)
-    sign = md5(s).lower()
-    params['sign'] = sign
-
-    consumerkey = params['consumerkey']
-    secretkey = params['secretkey']
-    timestamp = params['timestamp']
-    siteid = params['siteid']
-    pageSize = params['pageSize']
-    starttime = params['starttime']
-    endtime = params['endtime']
-    page = params['page']
-    ## +'&secretkey='+str(secretkey)
-    parameter = 'consumerkey=' + str(consumerkey) + '&timestamp=' + str(timestamp) + '&siteid=' + str(
-        siteid) + '&pageSize=' + str(pageSize) \
-                + '&starttime=' + str(starttime) + '&endtime=' + str(endtime) + '&page=' + str(page) + '&sign=' + str(
-        sign)
-    global get_url
-    get_url = url + "?" + parameter
-
-    while True:
-        r = requests.get(url=get_url)
-        if r.status_code == 200:
-            break
-        else:
-            time.sleep(1)
-            print("请求连接出错,等待1s...")
-
-    response_result_json = r.json()
-    del params['sign']
-    return response_result_json
-
-
-def get_wd_order_task(start, end, account):
-    order_list = []
-    url = 'https://bi.reading.163.com/dist-api/rechargeList'
-    consumerkey = account[0]
-    secretkey = account[1]
-    siteid = account[2]
-    stage = account[3]
-    siteid_list = get_wd_account_siteid_list(account)
-    # print(siteid_list)
-    if len(siteid_list) == 0:
-        siteid_list.append(siteid)
-
-    starttime = du.date_str_to_str(start) + '0000'
-    endtime = du.date_str_to_str(end) + '2359'
-
-    for siteid in siteid_list:
-
-        page = 1
-        while True:
-            params = {
-                'consumerkey': consumerkey,
-                'secretkey': secretkey,
-                'timestamp': int(1601481600),
-                'siteid': siteid,
-                'pageSize': 1000,
-                'starttime': starttime,
-                'endtime': endtime,
-                'page': page}
-
-            response_result_json = get_wending_json_object(url, params)
-            # print(response_result_json)
-
-            order_item_list = response_result_json['data']['rechargeList']
-
-            for x in order_item_list:
-                order_time = DateUtils.stamp_to_str(x['createTime'])
-                reg_time = DateUtils.stamp_to_str(x['userRegisterTime'])
-
-                order_list.append(
-                    (order_time[:10],
-                     stage,
-                     '文鼎',
-                     x['wx_mpName'],
-                     x['wx_originalId'],
-                     x['wx_user_openId'],
-                     order_time,
-                     reg_time,
-                     x['money'] / 100,
-                     x['bookTitle'] if x['bookTitle'] else '',
-                     x['ewTradeId'] if x.get('ewTradeId') else x['rechargeUuid'],
-                     2 if x['payStatus'] == 1 else 1
-                     # ,x['userId']
-                     )
-                )
-            if len(order_item_list) < 1000:
-                break
-            else:
-                page += 1
-    print(f"{stage} [{start}~{end}] 有订单 {order_list.__len__()}")
-    if order_list.__len__() > 0:
-        # print(order_list)
-        save_order(order_list)
-
-
-def get_gf_order_task(start, end, account):
-    order_list = []
-    url = 'https://bi.reading.163.com/dist-api/rechargeList'
-    consumerkey = account[0]
-    secretkey = account[1]
-    siteid = account[2]
-    stage = account[3]
-    siteid_list = get_wd_account_siteid_list(account)
-    # print(siteid_list)
-    if len(siteid_list) == 0:
-        siteid_list.append(siteid)
-
-    starttime = du.date_str_to_str(start) + '0000'
-    endtime = du.date_str_to_str(end) + '2359'
-
-    for siteid in siteid_list:
-
-        page = 1
-        while True:
-            params = {
-                'consumerkey': consumerkey,
-                'secretkey': secretkey,
-                'timestamp': int(1601481600),
-                'siteid': siteid,
-                'pageSize': 1000,
-                'starttime': starttime,
-                'endtime': endtime,
-                'page': page}
-
-            response_result_json = get_wending_json_object(url, params)
-            # print(response_result_json)
-            print(response_result_json)
-            order_item_list = response_result_json['data']['rechargeList']
-            print(order_item_list)
-            for x in order_item_list:
-                order_time = DateUtils.stamp_to_str(x['createTime'])
-                reg_time = DateUtils.stamp_to_str(x['userRegisterTime'])
-
-                order_list.append(
-                    (order_time[:10],
-                     stage,
-                     '国风',
-                     x['wx_mpName'],
-                     x['wx_originalId'],
-                     x['wx_user_openId'],
-                     order_time,
-                     reg_time,
-                     x['money'] / 100,
-                     x['bookTitle'] if x['bookTitle'] else '',
-                     x['ewTradeId'] if x.get('ewTradeId') else x['rechargeUuid'],
-                     2 if x['payStatus'] == 1 else 1
-                     # ,x['userId']
-                     )
-                )
-            if len(order_item_list) < 1000:
-                break
-            else:
-                page += 1
-    print(f"{stage} [{start}~{end}] 有订单 {order_list.__len__()}")
-    if order_list.__len__() > 0:
-        print(order_list)
-        sum = 0
-        for _ in order_list:
-            sum = sum + _[8]
-        print(sum)
-        save_order(order_list)
-
-
-def get_zd_order_task(start, end, account):
-    """开始到结束最多90天"""
-    order_list = []
-    url = 'https://api.zhangdu520.com/channel/getorder'
-
-    uid = account[0]
-    appsecert = account[1]
-    channel = account[2]
-    stage = account[3]
-    timestamp = int(time.time())
-    sign = md5(str(uid) + '&' + appsecert + '&' + str(timestamp))
-
-    for i in du.split_date2(start, end, 90):
-        starttime = DateUtils.str_to_stamp(i[0] + ' 00:00:00', '%Y-%m-%d %H:%M:%S')
-        endtime = DateUtils.str_to_stamp(i[1] + ' 23:59:59', '%Y-%m-%d %H:%M:%S')
-        page = 1
-        while True:
-            params = {
-                'uid': uid,
-                'timestamp': timestamp,
-                'sign': sign,
-                'starttime': starttime,
-                'endtime': endtime,
-                'page': page
-            }
-            response_result_json = requests.get(url=url, params=params).json()
-            # print(response_result_json)
-            if 'data' not in response_result_json.keys():
-                print(f'掌读账号【{uid}】, 查询时间【{i[0]} - {i[1]}】,本次请求数据异常,响应报文【{response_result_json}】')
-                break
-
-            result_data = response_result_json['data']
-            page_count = result_data['pageCount']
-            if page_count == 0:
-                break
-
-            order_item_list = result_data['list']
-            for i in order_item_list:
-                order_time = DateUtils.stamp_to_str(i['ctime'])
-                reg_time = DateUtils.stamp_to_str(i['regtime'])
-                order_list.append((
-                    order_time[:10],
-                    stage,
-                    '掌读',
-                    channel,
-                    uid,
-                    i['openid'],
-                    order_time,
-                    reg_time,
-                    i['amount'],
-                    i['book_entry'],
-                    i['orderno'],
-                    2 if i['status'] == '1' else 1
-                    # ,i['userid']
-
-                ))
-
-            if page == page_count:  # 是最后一页
-                break
-            page = page + 1
-
-    print(f"{channel} [{start}]~[{end}] 有订单 {order_list.__len__()}")
-    if len(order_list) > 0:
-        # print(order_list)
-        save_order(order_list)
-
-
-def get_zzy_order_task(start, end, account):
-    url = 'https://inovel.818tu.com/partners/channel/channels/list?'
-    key = account[0]
-    secert = account[1]
-    stage = account[2]
-    sign = md5(secert + 'key=' + key)
-    params = 'key=' + key + '&sign=' + sign
-    response_result_json = requests.get(url + params).json()  # 获取子渠道列表
-
-    if 'data' not in response_result_json.keys():
-        print('掌中云账号【{key}】本次请求数据异常,响应报文【{result}】'.format(key=key, result=response_result_json))
-        return
-    items = response_result_json['data']['items']
-    print(f'VIP{account[0]} 有公众号{len(items)} ')
-
-    total_order_list = []
-    for channel in items:
-        # 获取channel_id 后逐个拉取历史orders
-        order_list = get_zzy_channel_order(start, end, account, channel)
-        total_order_list.extend(order_list)
-    print(f"{stage} [{start}]~[{end}] 有订单{total_order_list.__len__()}")
-
-    if len(total_order_list) > 0:
-        save_order(total_order_list)
-
-
-def get_zzy_channel_order(start, end, account, channel):
-    get_time = DateUtils.str_to_date_str(start, f2="%Y-%m-%dT%H:%M:%S+08:00")
-    limit_time = DateUtils.str_to_date_str(end, f2="%Y-%m-%dT%H:%M:%S+08:00")
-    order_list = []
-    key = account[0]
-    secert = account[1]
-    stage = account[2]
-    order_url = 'https://openapi.818tu.com/partners/channel/orders/list?'
-    channel_id = channel['id']
-    channel_name = channel['nickname']
-    status = str(1)
-    page = str(1)
-    per_page = str(1000)
-    gte = parse.urlencode({'created_at[gte]': get_time})  # gte就是ge 大于等于开始时间
-    lt = parse.urlencode({'created_at[lt]': limit_time})  # 小于 结束时间
-
-    while True:
-        sign = md5(secert + 'channel_id=' + str(
-            channel_id) + '&created_at[gte]=' + get_time + '&created_at[lt]=' + limit_time + '&key=' + key + '&page=' + str(
-            page) + '&per_page=' + per_page + '&status=' + status)
-        params = 'channel_id=' + str(channel_id) + '&' + gte + '&' + lt + '&page=' + str(
-            page) + '&per_page=' + per_page + '&status=' + status + '&key=' + key + '&sign=' + sign
-        while True:
-
-            r = requests.get(order_url + params)
-
-            if r.status_code == 200:
-                response_result_json = r.json()
-                break
-            else:
-                time.sleep(61)
-                print("掌中云接口调用sleep 61s...")
-
-        # print(response_result_json)
-        if 'data' not in response_result_json.keys():
-            print(f'掌中云账号【{key}】,查询时间【{start} - {end}】,渠道【{channel_name}】本次请求数据异常,响应报文【{r.text}】')
-            break
-
-        total_count = response_result_json['data']['count']  # 总数量
-        order_item_list = response_result_json['data']['items']  # 订单列表
-
-        for i in order_item_list:
-            order_time = DateUtils.str_to_date_str(i['created_at'], "%Y-%m-%dT%H:%M:%S+08:00", "%Y-%m-%d %H:%M:%S")
-            reg_time = DateUtils.str_to_date_str(i['member']['created_at'], "%Y-%m-%dT%H:%M:%S+08:00",
-                                                 "%Y-%m-%d %H:%M:%S")
-
-            order_list.append((
-                order_time[:10],
-                stage,
-                '掌中云',
-                channel_name,
-                channel_id,
-                i['member']['openid'],
-                order_time,
-                reg_time,
-                round(i['price'] / 100, 2),
-                i['from_novel']['title'] if str(i['from_novel_id']) != 'None' else '',
-                str(i['id']),
-                2 if i['status'] == 1 else 1
-                # ,i['id']
-            ))
-
-        if int(page) >= math.ceil(total_count / int(per_page)):
-            break
-        page = int(page) + 1
-    # print(f"{channel_name}获取订单:{order_list.__len__()}")
-    # print(order_list)
-    return order_list
-
-
-def get_account(plactform, id=None):
-    op = f" and id={id} " if id else ''
-    data = db.quchen_text.getData(f"select text from order_account_text where platform='{plactform}' {op}")
-    new_data = []
-    for i in data:
-        new_data.append(i[0].replace('\n', '').split(","))
-    return new_data
-
-
-if __name__ == '__main__':
-    # account = "347347942,e0c361b54a35a55c2b6296b5a80867ce,趣程小程序"
-    # get_hs_order_task('2021-05-01','2021-05-07',account.split(","))
-
-    # print(DateUtils.stamp_to_str(1612155476,'%Y-%m-%d %H:%M:%S')[:10])
-    # exit(0)
-
-    st = et = '2021-05-07'
-
-    account = "62140324,KUUxPIokqtIrtvHQ,1025010,趣程19期,qucheng19qi@163.com"
-    get_wd_order_task(st, et, account.split(','))

+ 0 - 95
app/api_data/platform_order/wandu.py

@@ -1,95 +0,0 @@
-import time
-from model.ComUtils import md5, split_int
-import json
-from model.DateUtils import DateUtils
-from model.DataBaseUtils import MysqlUtils
-import requests
-from app.api_data.platform_order.order_util import save_order
-
-ut = DateUtils()
-db = MysqlUtils()
-
-
-def get_all_channel():
-    sql = '''
-    select name,wechat_account_id from advertiser_vx av 
-    '''
-    vx_list = db.quchen_text.get_data_list(sql)
-    res = {}
-    for channel, account_id in vx_list:
-        res[account_id] = channel
-    return res
-
-
-def get_wandu_order(st, et, account, channel_info):
-    print('get in ')
-    baseurl = 'http://vipzeus.666shuwu.cn/api/vipoutput/getorder'
-    api_secret, uid, stage = account
-    print(account)
-    start_time = DateUtils.str_to_stamp(st)
-    # 结束时间不能超过当前的时间戳
-    if et == ut.getNow():
-        end_time = DateUtils.str_to_stamp(ut.get_n_minutes_ago(), "%Y-%m-%d %H:%M:%S")
-    else:
-        end_time = DateUtils.str_to_stamp(et) + 86399
-    params = {}
-    page = 1
-    li = []
-
-    while True:
-        # 时间
-        params['starttime'] = start_time
-        params['endtime'] = end_time
-        # 基础信息
-        params['uid'] = uid
-        timestamp = int(time.time())
-        params['timestamp'] = timestamp
-        sign = md5(uid + api_secret + str(timestamp))
-        print(sign)
-        params['sign'] = sign
-        params['page'] = page
-        rsp = requests.get(baseurl, params=params)
-        print(rsp.text)
-        print(json.dumps(rsp.json(), ensure_ascii=False))
-
-        print(rsp.text)
-        rsp_json = rsp.json()
-        page = int(rsp_json['data']['page'])
-        page_size = int(rsp_json['data']['count_page'])
-        # 存入数据
-        rsp_info = rsp_json['data']['list']
-        for _ in rsp_info:
-            struct_time = time.localtime(_['ctime'])  # 得到结构化时间格式
-            order_time = time.strftime("%Y-%m-%d", struct_time)
-            order_time_detail = time.strftime('%Y-%m-%d %H:%M:%S', struct_time)
-
-            struct_time_reg = time.localtime(_['regtime'])  # 得到结构化时间格式
-            reg_time_detail = time.strftime('%Y-%m-%d %H:%M:%S', struct_time_reg)
-            # print(_['status'],type(_['status']))
-            order_status = 2 if _['status'] == 3 else 1
-
-            li.append((order_time,
-                       stage,
-                       '万读',
-                       channel_info[_['appid']],
-                       0,
-                       _['openid'],
-                       order_time_detail,
-                       reg_time_detail,
-                       _['amount'],
-                       '',
-                       _['orderno'],
-                       order_status
-
-                       ))
-            # 外出
-        if page > page_size or page == page_size:
-            break
-        page = page + 1
-    save_order(li)
-
-
-if __name__ == '__main__':
-    channel_info = get_all_channel()
-    get_wandu_order('2019-08-01', '2021-10-20', '7ffedfa36431ca09c231bbdc8ca12217,6121,趣程43期', channel_info)
-    # get_yuewen_order_task('2021-08-01', '2021-08-02', a.split(','))

+ 0 - 249
app/api_data/platform_order/yangguang.py

@@ -1,249 +0,0 @@
-from model.DataBaseUtils import MysqlUtils
-from model.DingTalkUtils import DingTalkDecorators, DingTalkUtils
-from model.DateUtils import DateUtils
-from model import ComUtils
-import time
-import json
-import requests
-
-db = MysqlUtils()
-du = DateUtils()
-
-
-@DingTalkDecorators("阳光")
-def yangguang(start=None, end=None):
-    get_channel_info()
-
-    accounts = get_account("阳光")
-
-    if start:
-        start = start + ' 00:00:00'
-        end = end + ' 23:59:59'
-    else:
-        start = du.getTodayOrYestoday() + ' 00:00:00'
-        end = du.get_n_hours_ago(0)
-
-    client_id = 10008097
-    token = '2xa1d55tTPBjeEA8Ho'
-
-    if accounts.__len__() == 0:
-        return
-    else:
-        print(f"阳光账号数:{accounts.__len__()}")
-
-    for i in accounts:
-        stage = i[0]
-        vip_id = i[1]
-        print(stage, vip_id)
-        print(vip_id)
-        # get_yg_vip_channel(stage, vip_id, client_id, token)
-        get_yg_data(stage, vip_id, client_id, token, start, end)
-
-    print(check())
-    parse_order_data()
-
-
-def get_yg_data(stage, vip_id, client_id, token, start, end):
-    url = "https://data.yifengaf.cn:443/channeldata/data/orders/list"
-    nonce = ComUtils.get_random_str()
-    timestamp = int(time.time())
-    signaure = ComUtils.sha1(str(token) + str(timestamp) + str(client_id) + str(nonce))
-    params = {
-        "client_id": client_id,
-        "token": token,
-        "nonce": nonce,
-        "timestamp": timestamp,
-        "signaure": signaure,
-        "vip_id": vip_id,
-        "start_time": start,  # %Y-%m-%d %H:%i:%s:
-        "end_time": end
-    }
-    headers = {"Content-Type": "application/json"}
-    for i in range(5):
-        try:
-            r = requests.post(url=url, data=json.dumps(params), headers=headers, timeout=5)
-            break
-        except:
-            pass
-        raise
-    print(vip_id, r.text)
-    task_id = json.loads(r.text).get("data").get("task_id")
-    db.quchen_text.execute(
-        f"replace into yangguang_path(vip_id,task_id,stage,type) values ('{vip_id}','{task_id}','{stage}','order')")
-
-
-def get_yg_vip_channel(stage, vip_id, client_id, token):
-    url = "https://data.yifengaf.cn:443/channeldata/data/account/list"
-    nonce = ComUtils.get_random_str()
-    timestamp = int(time.time())
-    signaure = ComUtils.sha1(str(token) + str(timestamp) + str(client_id) + str(nonce))
-    params = {
-        "client_id": client_id,
-        "token": token,
-        "nonce": nonce,
-        "timestamp": timestamp,
-        "signaure": signaure,
-        "vip_id": vip_id,
-    }
-    headers = {"Content-Type": "application/json"}
-    r = requests.post(url=url, data=json.dumps(params), headers=headers)
-    print(r.text)
-    task_id = json.loads(r.text).get("data").get("task_id")
-    db.quchen_text.execute(
-        f"replace into yangguang_path(vip_id,task_id,stage,type) values ('{vip_id}','{task_id}','{stage}','channel')")
-
-
-def get_channel_info():
-    accounts = get_account("阳光")
-    client_id = 10008097
-    token = '2xa1d55tTPBjeEA8Ho'
-    for i in accounts:
-        stage = i[0]
-        vip_id = i[1]
-        print(vip_id)
-        get_yg_vip_channel(stage, vip_id, client_id, token)
-
-
-def parse_order_data():
-    print(111)
-    accounts = get_account("阳光")
-
-    for i in accounts:
-        # print(i)
-        try:
-            vip_id = i[1]
-            stage = i[0]
-            data = parse_order(vip_id, stage)
-            save_data(data)
-        except Exception as e:
-            DingTalkUtils().send(msg='阳光出错vipid:' + str(vip_id), phone='15168342316')
-            print(e)
-
-
-def parse_order(vip_id, stage):
-    print(vip_id)
-    url = db.quchen_text.getOne(f"select path from yangguang_path where type='channel' and vip_id={vip_id} ")
-    for i in range(5):
-        try:
-            r = requests.get(url, timeout=5).text
-            print(r)
-            break
-        except:
-            pass
-        raise
-
-    channel_di = {}
-    a = r.split('}')
-    for i in a[:-1]:
-        if i[-1] != '}':
-            b = json.loads(i + "}", strict=False)
-
-        else:
-            b = json.loads(i, strict=False)
-        channel_di[b["channel_id"]] = (b["wx_nickname"], b['app_id'])
-
-    # print(channel_di)
-    print(f'{stage} 有channel数:{len(channel_di)}')
-
-    info = db.quchen_text.getData(f"select stage,path from yangguang_path where type='order' and vip_id={vip_id}")
-    stage = info[0][0]
-    path = info[0][1]
-    for i in range(5):
-        try:
-            text = requests.get(path, timeout=5).text.replace('"referral_url":,', '')
-            print(text)
-            break
-        except Exception as e:
-            print('channel', e)
-            # raise
-    insert_data = []
-    for j in text.split("}")[:-1]:
-        if j[-1] != '}':
-            j = j + '}'
-        try:
-            di = json.loads(j, strict=False)
-        except Exception as e:
-            print(j)
-            print(e)
-
-        insert_data.append((
-            di["create_time"][:10],
-            stage,
-            '阳光',
-            channel_di[di['channel_id']][0],
-            di['channel_id'],
-            di['openid'],
-            di['create_time'],
-            di['user_createtime'],
-            di['money'],
-            di.get('book_name'),
-            di['merchant_id'],
-            2 if di['state'] == "完成" else 1,
-            di['user_id'],
-            channel_di[di['channel_id']][1],
-            1 if di['type'] == '书币充值' else 2,
-            di['merchant_id'],
-            di['transaction_id']
-        ))
-    # print(insert_data)
-    # exit(0)
-    print("订单数:" + str(insert_data.__len__()))
-    save_data(insert_data)
-
-
-def save_data(data):
-    sql = """replace into ods_order(date,stage,platform,channel,channel_id,user_id,order_time,
-    reg_time,amount,from_novel,order_id,status,platform_user_id,wechat_app_id,order_type,
-    trade_no,transaction_no) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) """
-    # print(sql)
-
-    db.quchen_text.executeMany(sql, data)
-
-
-def check():
-    x = 1
-    while True:
-        a = db.quchen_text.getOne("select count(1) from yangguang_path where type ='order' and path is null")
-        print(f" 回调接口 待处理数量 {a} ")
-        if a == 0:
-            info = '回调完成'
-            break
-        time.sleep(60)
-        x += 1
-        if x > 10:
-            DingTalkUtils().send('阳光订单回调延时10min', '15168342316')
-            info = '回调未完成'
-            break
-    return info
-
-
-def get_account(plactform, id=None):
-    op = f" and id={id} " if id else ''
-    data = db.quchen_text.getData(f"select text from order_account_text where platform='{plactform}' {op}")
-    new_data = []
-    for i in data:
-        new_data.append(i[0].replace('\n', '').split(","))
-    return new_data
-
-
-def daily_yg():
-    st = du.get_n_days(-10)
-    et = du.get_n_days(-1)
-    yangguang(st, et)
-
-
-if __name__ == '__main__':
-    # get_channel_info()
-    # exit(0)
-    yangguang(start=du.get_n_days(-2), end=du.get_n_days(0))
-
-    # yangguang('2021-05-28','2021-05-28')
-    # daily_yg()
-    # for i in du.split_date2('2020-06-28','2020-11-03',30):
-    #     print(i)
-    #     yangguang(i[0], i[1])
-
-    # parse_order_data()
-    # get_channel_info()
-    # yangguang(start=du.get_n_days(-1),end=du.get_n_days(0))
-    # parse_order('29600', stage='趣程27期')

+ 0 - 68
app/api_data/platform_order/youshuge.py

@@ -1,68 +0,0 @@
-import time
-from model.DateUtils import DateUtils
-from model.ComUtils import md5
-import requests
-from app.api_data.platform_order.order_util import save_order
-
-ut = DateUtils()
-
-
-def get_youshuge_order_task(st, et, account):
-    et = ut.add_days(et, 1)
-    url = 'https://novel.youshuge.com/v2/open/orders'
-    host_name = account[0]
-    channel_id = int(account[1])
-    secert_key = account[2]
-    channel = account[3]
-    stage = account[4]
-    page = 1
-
-    li = []
-    while True:
-
-        timestamp = int(time.time())
-        sign = md5('channel_id=' + str(channel_id) + '&end_date=' + et + '&host_name=' + host_name + '&page='
-                   + str(page) + '&start_date=' + st + '&time=' + str(
-            timestamp) + '&key=' + secert_key).upper()
-        params = {
-            'sign': sign,
-            'host_name': host_name,
-            'time': timestamp,
-            'channel_id': channel_id,
-            'page': page,
-            'start_date': st,
-            'end_date': et
-        }
-        r = requests.post(url, params).json()
-        # print(r)
-
-        order_item_list = r['data']
-        if len(order_item_list) == 0:
-            break
-
-        for i in order_item_list:
-            li.append((i["create_time"][:10],
-                       stage,
-                       '悠书阁',
-                       channel,
-                       channel_id,
-                       i['openid'],
-                       i["create_time"],
-                       i['reg_time'],
-                       int(i['price']) / 100,
-                       i['book_name'],
-                       i['order_num'],
-                       2 if i['pay_status'] == '1' else 1
-                       ))
-
-        page += 1
-
-    if len(li) > 0:
-        print(f"{channel} 有订单{len(li)}")
-        save_order(li)
-
-
-if __name__ == '__main__':
-    a = "趣程15期,10696,8OC7SNCL46ZEI7JBACXFDM8CP5JM1FSL,盛德文苑,趣程15期"
-    a = '趣程清勇6,10675,B6FK1HFW1V9ZPD6WX0FGBX3WP73E6M5V,璃月文楼,清勇7月'
-    get_youshuge_order_task('2021-01-06', '2021-08-06', a.split(','))

+ 0 - 119
app/api_data/platform_order/yuewen.py

@@ -1,119 +0,0 @@
-import time
-from model.DateUtils import DateUtils
-from model.ComUtils import md5
-import requests
-from model.ComUtils import split_int
-from app.api_data.platform_order.order_util import save_order
-from model.DingTalkUtils import DingTalkUtils
-ut = DateUtils()
-
-def get_yuewen_order_task(st, et, account):
-    """相同参数一分钟只能调用一次"""
-    email = account[0]
-    appsecert = account[1]
-
-    url = 'https://open.yuewen.com/cpapi/wxRecharge/querychargelog'
-
-    start_time = DateUtils.str_to_stamp(st)
-    # 结束时间不能超过当前的时间戳
-    if et == ut.getNow():
-        end_time = DateUtils.str_to_stamp(ut.get_n_minutes_ago(), "%Y-%m-%d %H:%M:%S")
-    else:
-        end_time = DateUtils.str_to_stamp(et) + 86399
-
-
-    for i in split_int(start_time,end_time,10000):
-        start = i[0]
-        end = i[1]
-        print(ut.stamp_to_str(end))
-
-
-        page = 1
-        last_min_id = ''
-        last_max_id = ''
-        total_count = ''
-        last_page = ''
-
-
-
-        li = []
-        while True:
-
-            params = {
-                'email': email,
-                'timestamp': int(time.time()),
-                'start_time': start,
-                'end_time': end,
-                'page': page,
-                'version':1
-            }
-
-            if page > 1:
-                params['last_min_id'] = last_min_id
-                params['last_max_id'] = last_max_id
-                params['total_count'] = total_count
-                params['last_page'] = last_page
-
-            sorted_data = sorted(params.items())
-            str_params = ''
-            for k, v in sorted_data:
-                str_params = str_params + str(k) + str(v)
-            sign = md5(appsecert + str_params).upper()
-
-            # 放入签名
-            params['sign'] = sign
-            # print(params)
-            response_result_json = requests.get(url=url, params=params).json()
-
-            # print(len(response_result_json["data"]["list"]))
-            if not response_result_json.get('data'):
-                print(response_result_json)
-                DingTalkUtils().send('阅文订单拉取失败')
-
-
-            response_data = response_result_json['data']
-            total_count = response_data['total_count']
-
-
-            last_min_id = response_data['min_id']
-            last_max_id = response_data['max_id']
-            last_page = response_data['page']
-            order_item_list = response_data['list']
-
-            if len(order_item_list) == 0:
-                break
-
-            for i in order_item_list:
-                order_time = i["order_time"]
-                li.append((order_time[:10],
-                           '',
-                           '阅文',
-                           i['app_name'],
-                           i['channel_id'],
-                           i['openid'],
-                           i['order_time'],
-                           i['reg_time'],
-                           i['amount'],
-                           i['book_name'],
-                           i['order_id'],
-                           i['order_status']
-
-                ))
-
-            if len(order_item_list) < 100:
-                break
-            else:
-                page += 1
-
-        if len(li) > 0:
-            print(f"{email} 有订单{len(li)}")
-            save_order(li)
-
-
-
-if __name__ == '__main__':
-    a = "guangzhouliuqi2@sina.com,10ce1dd6ccb330a82b73701d1e78f518"
-    b = "mqud82950@163.com,74ca754515fa253c8ab790603cebc2ee"
-    {"version": "1", "app_secret": "74ca754515fa253c8ab790603cebc2ee", "email": "mqud82950@163.com"}
-    a = 'mqud82950@163.com,74ca754515fa253c8ab790603cebc2ee'
-    get_yuewen_order_task('2021-08-01', '2021-08-02', a.split(','))

+ 0 - 117
app/api_data/platform_order/yuewen_fast_app.py

@@ -1,117 +0,0 @@
-import time
-from model.DateUtils import DateUtils
-from model.ComUtils import md5
-import requests
-from model.ComUtils import split_int
-from app.api_data.platform_order.order_util import save_order
-from model.DingTalkUtils import DingTalkUtils
-
-ut = DateUtils()
-
-
-def get_yuewen_fast_app_order_task(st, et, account):
-    """相同参数一分钟只能调用一次"""
-    email = account[0]
-    appsecert = account[1]
-
-    url = 'https://open.yuewen.com/cpapi/wxRecharge/quickappchargelog'
-
-    start_time = DateUtils.str_to_stamp(st)
-    # 结束时间不能超过当前的时间戳
-    if et == ut.getNow():
-        end_time = DateUtils.str_to_stamp(ut.get_n_minutes_ago(), "%Y-%m-%d %H:%M:%S")
-    else:
-        end_time = DateUtils.str_to_stamp(et) + 86399
-
-    for i in split_int(start_time, end_time, 10000):
-        print(i)
-        start = i[0]
-        end = i[1]
-
-        page = 1
-        last_min_id = ''
-        last_max_id = ''
-        total_count = ''
-        last_page = ''
-
-        li = []
-        while True:
-
-            params = {
-                'email': email,
-                'coop_type': 11,
-                'timestamp': int(time.time()),
-                'start_time': start,
-                'end_time': end,
-                'version': 1
-            }
-
-            if page > 1:
-                params['last_min_id'] = last_min_id
-                params['last_max_id'] = last_max_id
-                params['total_count'] = total_count
-                params['last_page'] = last_page
-
-            sorted_data = sorted(params.items())
-            str_params = ''
-            for k, v in sorted_data:
-                str_params = str_params + str(k) + str(v)
-            sign = md5(appsecert + str_params).upper()
-
-            # 放入签名
-            params['sign'] = sign
-            # print(params)
-            rsp = requests.get(url=url, params=params)
-            print(rsp.text)
-            response_result_json=rsp.json()
-            print(response_result_json)
-            print(len(response_result_json["data"]["list"]))
-            if not response_result_json.get('data'):
-                DingTalkUtils().send('阅文订单拉取失败')
-
-            response_data = response_result_json['data']
-            total_count = response_data['total_count']
-
-            last_min_id = response_data['min_id']
-            last_max_id = response_data['max_id']
-            last_page = response_data['page']
-            order_item_list = response_data['list']
-
-            if len(order_item_list) == 0:
-                break
-
-            for i in order_item_list:
-                order_time = i["order_time"]
-                li.append((order_time[:10],
-                           '',
-                           '阅文快应用',
-                           'fast_app_'+i['app_name'],
-                           '',
-                           i['openid'],  # 这部分可能要修改,openid对应的微信相关id
-                           i['order_time'],
-                           i['reg_time'],
-                           i['amount'],
-                           i['book_name'],
-                           i['order_id'],
-                           i['order_status']
-
-                           ))
-
-            if len(order_item_list) < 100:
-                break
-            else:
-                page += 1
-        print(li)
-        if len(li) > 0:
-            print(f"{email} 有订单{len(li)}")
-            save_order(li)
-
-
-if __name__ == '__main__':
-    a = "guangzhouliuqi2@sina.com,10ce1dd6ccb330a82b73701d1e78f518"
-    b = "mqud82950@163.com,74ca754515fa253c8ab790603cebc2ee"
-    {"version": "1", "app_secret": "74ca754515fa253c8ab790603cebc2ee", "email": "mqud82950@163.com"}
-    a = 'mqud82950@163.com,74ca754515fa253c8ab790603cebc2ee'
-    a = 'qucheng66qi@163.com,21a8345b00ef9e062c940887e0dbedb3'
-
-    get_yuewen_fast_app_order_task('2021-09-16', '2021-09-25', a.split(','))

+ 10 - 138
app/api_data/tx_ad_cost/get_cost_older.py

@@ -24,7 +24,7 @@ def md5value(s):
 def daily_reports_get(access_token, account_id, st, et, level, fields, err_num=0):
     logging.info(f'开始获取消耗数据,token:{access_token}, id:{account_id}, st:{str(st)}, et:{str(et)}')
     interface = 'daily_reports/get'
-    url = 'https://api.e.qq.com/v1.1/' + interface
+    url = 'https://api.e.qq.com/v1.3/' + interface
 
     common_parameters = {
         'access_token': access_token,
@@ -108,65 +108,13 @@ def get_v_data(y, li, st, et):
         logging.error('vx account:{}   error :{}'.format(str(y),str(e)))
 
 
-def get_tt_data(account_info, li, st, et):
-    def build_url_ad(path, query=""):
-        # type: (str, str) -> str
-        """
-        Build request URL
-        :param path: Request path
-        :param query: Querystring
-        :return: Request URL
-        """
-        scheme, netloc = "https", "ad.oceanengine.com"
-        return urlunparse((scheme, netloc, path, "", query, ""))
-
-    page_num = 1
-    advertiser_ids = account_info[1]
-    for advertiser_id in advertiser_ids:
-        while True:
-            # account_info
-            my_args = {
-                "start_date": st,
-                "end_date": et,
-                "page_size": 100,
-                "page": page_num,
-                # "agent_id" : "1708974248093789",
-                'advertiser_id': advertiser_id,
-                # "start_date": "%s"
-            }
-            PATH = "/open_api/2/report/advertiser/get/"
-            args = json.loads(json.dumps(my_args))
-            query_string = urlencode({k: v if isinstance(v, string_types) else json.dumps(v) for k, v in args.items()})
-            url = build_url_ad(PATH, query_string)
-            headers = {
-                "Access-Token": account_info[0],
-            }
-            rsp = requests.get(url, headers=headers)
-            '''
-            date,cost,view_count,valid_click_count,
-        ctr,official_account_follow_rate,order_amount,
-        order_roi,order_count,order_rate,order_unit_price,
-        web_order_cost,first_day_order_amount,first_day_order_count,account_id
-            '''
-            # print(account_info)
-            # print(rsp.text)
-            result = rsp.json()
-            for _ in result['data']['list']:
-                campaign_info = (_['stat_datetime'][:10], _['cost'] * 100, _['show'], _['click'],
-                                 _['ctr'], None, None,
-                                 None, None, None, None,
-                                 None, None, None, advertiser_id)
-                li.append(campaign_info)
-            total_page = result['data']['page_info']['total_page']
-            if page_num > total_page or page_num == total_page:
-                break
-            else:
-                page_num = page_num + 1
 
 
 def get_vx_list():
-    sql = "select account_id,wechat_account_id,access_token,refresh_token,name," \
-          "ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from advertiser_vx"
+    sql = '''select account_id,wechat_account_id,access_token,refresh_token,name,
+          ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from advertiser_vx
+          where access_token is not null
+          '''
     a = db.quchen_text.getData(sql)
     return a
 
@@ -194,14 +142,6 @@ def mysql_insert_daily_qq(data):
     db.quchen_text.executeMany(a, data)
 
 
-def mysql_insert_daily_tt(data):
-    b = """replace into daily_tt (date,cost,view_count,valid_click_count,ctr,
-    official_account_follow_rate,order_amount,
-	order_roi,order_count,order_rate,order_unit_price,
-	web_order_cost,first_day_order_amount,
-	first_day_order_count,account_id)
-	 values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
-    db.quchen_text.executeMany(b, data)
 
 
 def get_daily_vx(st, et):
@@ -232,80 +172,13 @@ def get_daily_qq(st, et):
     mysql_insert_daily_qq(li)
 
 
-def get_daily_tt(st, et):
-    def refresh_access_token(appid, account_id, secret, refresh_token):
-        open_api_url_prefix = "https://ad.oceanengine.com/open_api/"
-        uri = "oauth2/refresh_token/"
-        refresh_token_url = open_api_url_prefix + uri
-        data = {
-            "appid": appid,
-            "secret": secret,
-            "grant_type": "refresh_token",
-            "refresh_token": refresh_token,
-        }
-        rsp = requests.post(refresh_token_url, json=data)
-        rsp_data = rsp.json()
-        new_refresh_token = rsp_data['data']['refresh_token']
-        new_access_token = rsp_data['data']['access_token']
-        sql = f'''
-        update bytedance_login_info
-        set refresh_token='{new_refresh_token}' ,access_token='{new_access_token}'
-        where appid='{appid}' and account_id='{account_id}'
-        '''
-        db.quchen_text.execute(sql)
-        return rsp_data['data']['access_token']
-
-    # 1.获取refresh_token
-    sql = '''
-    select appid,account_id,secret,refresh_token from bytedance_login_info
-    '''
-    accounts_info = db.quchen_text.getData(sql)
-
-    # 2.刷新refresh_token,并获取最新的access_token
-    for account_info in accounts_info:
-        appid, account_id, secret, refresh_token = account_info
-        access_token = refresh_access_token(appid, account_id, secret, refresh_token)
-        # 3.获取agent_id
-        sql = f'''
-        select distinct(advertiser_id) from advertiser_bytedance
-        where appid='{appid}' and account_id='{account_id}'
-        '''
-        advertiser_ids = db.quchen_text.getData(sql)
-        logging.info("获取头条账号:" + str(advertiser_ids.__len__()))
-        advertiser_ids = [_[0] for _ in advertiser_ids]
-        # token,adv_ids
-        account_info = (access_token, advertiser_ids)
-        time1 = time.time()
-        li = []
-        get_tt_data(account_info, li, st, et)
-        logging.info('get_tt_order:' + str(len(li)) + 'cost:' + str(int(time.time() - time1)))
-        mysql_insert_daily_tt(li)
-
-def get_token_bytedance():
-    #添加bytedance账号,需要添加一下access_token
-    open_api_url_prefix = "https://ad.oceanengine.com/open_api/"
-    uri = "oauth2/access_token/"
-    url = open_api_url_prefix + uri
-    data = {
-        "app_id": 1709866698360883,
-        "secret": "****",
-        "grant_type": "auth_code",
-        "auth_code": "********"
-    }
-    rsp = requests.post(url, json=data)
-    rsp_data = rsp.json()
-    return rsp_data['data']['access_token']
-
 def run(st, et):
     logging.info('微信消耗数据拉取,开始')
     get_daily_vx(st, et)
     logging.info('微信消耗数据拉取,结束')
-    logging.info('qq消耗数据拉取,开始')
-    get_daily_qq(st, et)
-    logging.info('qq消耗数据拉取,结束')
-    logging.info('头条消耗数据拉取,开始')
-    get_daily_tt(st, et)
-    logging.info('头条消耗数据拉取,结束')
+    # logging.info('qq消耗数据拉取,开始')
+    # get_daily_qq(st, et)
+    # logging.info('qq消耗数据拉取,结束')
 
 
 def old_cost_hourly():
@@ -324,7 +197,6 @@ def old_cost_daily():
 if __name__ == '__main__':
     # run()
     # old_cost_daily()
-    st = du.get_n_days(-30)
+    st = du.get_n_days(-365)
     et = du.get_n_days(0)
-    print(st, et)
-    get_daily_tt(st, et)
+    run(st, et)

+ 7 - 7
app/crontab_task/dailyRun.py

@@ -35,13 +35,13 @@ if __name__ == '__main__':
     logging.info('订单消耗数据处理,结束')
 
     # 广告数据
-    logging.info('广告数据获取,开始')
-    get_cost.day()
-    logging.info('广告数据获取,结束')
-
-    logging.info('广告数据处理,开始')
-    dw_image_cost_day.day()
-    logging.info('广告数据处理,结束')
+    # logging.info('广告数据获取,开始')
+    # get_cost.day()
+    # logging.info('广告数据获取,结束')
+    #
+    # logging.info('广告数据处理,开始')
+    # dw_image_cost_day.day()
+    # logging.info('广告数据处理,结束')
 
     # 素材库
     # logging.info('素材库数据处理,开始')

+ 1 - 1
app/crontab_task/hourlyRun.py

@@ -34,7 +34,7 @@ if __name__ == '__main__':
 
 
     if int(time.time()-st)>1500:
-        DingTalkUtils().send(f"消耗小时任务耗时{int(time.time()-st)}秒",phone="15168342316")
+        DingTalkUtils().send(f"游戏,消耗小时任务耗时{int(time.time()-st)}秒",phone="15168342316")
 
 
 

+ 6 - 87
app/crontab_task/task.py

@@ -1,54 +1,16 @@
 import threading
-from app.api_data.platform_order.get_order import *
 from app.api_data.tx_ad_cost.get_cost_older import old_cost_hourly, old_cost_daily
 from app.etl.sync_to_ck_task import order_sync_ck
-from app.api_data.platform_order import yangguang
 from app.api_data.tx_ad_cost import get_cost_older
 from app.etl.data_stat_run import do_cost
+from app.api_data.platform_order.order_data_change import insert_order_data_daily,insert_order_data_hourly
+from model.DateUtils import DateUtils
 
+du = DateUtils()
 
 def hourly():
-    t1 = threading.Thread(target=huasheng)
-    t2 = threading.Thread(target=qiyue)
-    t3 = threading.Thread(target=qiyueyousheng)
-    t4 = threading.Thread(target=wending)
-    t5 = threading.Thread(target=zhangdu)
-    t6 = threading.Thread(target=zhangzhongyun)
-    t7 = threading.Thread(target=yueweng)
-    t8 = threading.Thread(target=yangguang.yangguang)
-    t9 = threading.Thread(target=youshuge)
-    t10 = threading.Thread(target=guofeng)
     t11 = threading.Thread(target=old_cost_hourly)
-    t12 = threading.Thread(target=yueweng_fastapp)
-    t13 = threading.Thread(target=wandu)
-    t1.start()
-    t1.join()
-    t2.start()
-    t2.join()
-
-    t3.start()
-    t3.join()
-
-    t4.start()
-    t4.join()
-
-    t5.start()
-    t5.join()
-
-    t6.start()
-    t6.join()
-
-    t7.start()
-    t7.join()
-
-    t8.start()
-    t8.join()
-
-    t9.start()
-    t9.join()
-
-    t10.start()
-    t10.join()
+    t12 = threading.Thread(target=insert_order_data_hourly)
 
     t11.start()
     t11.join()
@@ -56,55 +18,16 @@ def hourly():
     t12.start()
     t12.join()
 
-    t13.start()
-    t13.join()
-
 def daily():
-    st = du.get_n_days(-10)
-    et = du.get_n_days(-1)
-
-    t1 = threading.Thread(target=huasheng, args=(st, et))
-    t2 = threading.Thread(target=qiyue, args=(st, et))
-    t3 = threading.Thread(target=qiyueyousheng, args=(st, et))
-    t4 = threading.Thread(target=wending, args=(st, et))
-    t5 = threading.Thread(target=zhangdu, args=(st, et))
-    t6 = threading.Thread(target=zhangzhongyun, args=(st, et))
-    t7 = threading.Thread(target=yueweng, args=(st, et))
-    t8 = threading.Thread(target=yangguang, args=(st, et))
-    t9 = threading.Thread(target=youshuge, args=(st, et))
-    t10 = threading.Thread(target=guofeng, args=(st, et))
-    t12 = threading.Thread(target=yueweng_fastapp, args=(st, et))
-    t13 = threading.Thread(target=wandu, args=(st, et))
 
     t11 = threading.Thread(target=old_cost_daily, args=())
+    t12 = threading.Thread(target=insert_order_data_daily())
 
-    t1.start()
-    t1.join()
-    t2.start()
-    t2.join()
-    t3.start()
-    t3.join()
-    t4.start()
-    t4.join()
-    t5.start()
-    t5.join()
-    t6.start()
-    t6.join()
-    t7.start()
-    t7.join()
-    t8.start()
-    t8.join()
-    t9.start()
-    t9.join()
-    t10.start()
-    t10.join()
     t11.start()
     t11.join()
+
     t12.start()
     t12.join()
-    t13.start()
-    t13.join()
-    # yangguang.get_channel_info()
 
 
 def cost_yestoday_repair():
@@ -113,10 +36,6 @@ def cost_yestoday_repair():
     do_cost(dt, dt)
 
 
-def yueweng_order_repair():
-    dt = du.get_n_days(-1)
-    yueweng(dt, dt)
-    order_sync_ck(dt)
 
 
 if __name__ == '__main__':

+ 2 - 2
app/etl/data_stat_run.py

@@ -35,7 +35,7 @@ def do_cost(st, et):
         channel_by_account_daily(i)
         channel_info_daily(i)
         dw_daily_channel_cost(i)
-        dw_daily_bytedance_cost(i)
+        # dw_daily_bytedance_cost(i)
         platform_data_sum(i)
     logging.info('消耗数据处理,结束')
 
@@ -44,7 +44,7 @@ def main(st, et):
     try:
         do_order(st, et)
         do_cost(st, et)
-        src_book_info()  # 书籍卡点信息
+        # src_book_info()  # 书籍卡点信息
         # book_annual_expect_profit.run() # 年预期收益
         dw_channel_daily()
         dw_pitcher_trend()

+ 14 - 25
app/etl/data_stat_task.py

@@ -67,15 +67,10 @@ def dw_daily_bytedance_cost(ymd):
 
 def platform_data_sum(ymd):
     logging.info('dw_daily_platform_cost开始数据更新')
-    ck.execute("alter table dw_daily_platform_cost drop  partition '{}' ".format(ymd))
+    ck.execute("alter table game_data.dw_daily_platform_cost drop  partition '{}' ".format(ymd))
     sql = f'''
-        insert into dw_daily_platform_cost
-        select * from 
-        (select * from dw_daily_bytedance_cost a where dt='{ymd}'
-        union all 
-        select * from dw_daily_channel_cost b where dt='{ymd}'
-        AND channel not  in (select channel from dw_daily_bytedance_cost a 
-        where dt='{ymd}'))
+        insert into game_data.dw_daily_platform_cost
+        select * from game_data.dw_daily_channel_cost b where dt='{ymd}'
     '''
     ck.execute(sql)
     logging.info('dw_daily_platform_cost数据更新,结束')
@@ -139,9 +134,9 @@ def dw_daily_channel_cost(ymd):
         i[11] = float(i[11])
         i[12] = float(i[12])
         data1.append(tuple(i))
-    ck.execute(f"alter table dw_daily_channel_cost drop  partition '{ymd}' ")
+    ck.execute(f"alter table game_data.dw_daily_channel_cost drop  partition '{ymd}' ")
     logging.info(len(data1))
-    ck.insertMany("dw_daily_channel_cost", col, tuple(data1))
+    ck.insertMany("game_data.dw_daily_channel_cost", col, tuple(data1))
 
 
 def channel_by_account_daily(ymd):
@@ -169,11 +164,9 @@ def channel_info_daily(ymd):
     """
     # 获取现在的全量公众号信息
     logging.info("run> channel_info_daily")
-    sql = """select '{}' as dt,a.name ,ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from (
+    sql = f"""select '{ymd}' as dt,a.name ,ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from (
             select  name from advertiser_vx  where name is not null group by name--  公众号全量表
             union
-            select  name from advertiser_qq where name is not null group by name
-            union
             select  name from account_change group by name
             union
             select channel as name   from pitcher_change group by channel
@@ -186,19 +179,12 @@ def channel_info_daily(ymd):
               (
                 select name,ifnull(stage,'') stage,ifnull(pitcher,'') pitcher,
                 ifnull(platform,'') platform,ifnull(book,'') book 
-                from advertiser_qq where name is not null  
+                from advertiser_vx 
+                where name is not null  
+                and start_date < '{ymd}'
                 group by name,stage,pitcher,platform,book
-             union
-                select name,ifnull(stage,'') stage,ifnull(pitcher,'') pitcher,
-                ifnull(platform,'') platform,ifnull(book,'') book 
-                from advertiser_vx where name is not null and name !=''
-            union
-                select channel,ifnull(stage,'') stage,ifnull(pitcher,'') pitcher,
-                ifnull(platform,'') platform,ifnull(book,'') book
-                from advertiser_bytedance where channel is not null and channel !=''
-                group by channel ,stage,pitcher,platform,book
                 ) b on a.name=b.name
-                """.format(ymd)
+                """
     data = db.quchen_text.get_data_list(sql)
     pitcher_change = db.quchen_text.getData(
         "select b.channel as channel,pitcher from "
@@ -270,7 +256,10 @@ if __name__ == '__main__':
     # channel_info_daily('2021-02-06')
 
     # channel_by_account_daily('2021-02-05')
-    for i in dt.getDateLists('2021-07-23', '2021-09-17'):
+    for i in dt.getDateLists('2021-09-11', '2021-10-27'):
         print(i)
+        channel_info_daily(i)
+
         dw_daily_channel_cost(i)
     # ods_order('2021-05-06')
+        platform_data_sum(i)

+ 11 - 11
app/etl/dm/dm_pitcher_daily_overview.py

@@ -31,17 +31,17 @@ select
        if(last_month_cost < 1, 0, last_month_far_amount / last_month_cost) last_month_far_roi
 from (select pitcher,
              sum(follow_user) follow_user
-      from dw_daily_channel_cost
+      from game_data.dw_daily_channel_cost
       group by pitcher) q
         left outer join
      (select count(distinct channel) channel_count, pitcher
-      from dw_daily_channel_cost
+      from game_data.dw_daily_channel_cost
       where dt = '{du.get_n_days(0)}'
       group by pitcher) w
      on q.pitcher = w.pitcher
          left outer join(
     select count(distinct channel) on_channel_count, pitcher
-    from dw_daily_channel_cost
+    from game_data.dw_daily_channel_cost
     where dt >= '{du.get_n_days(-15)}'
       and cost > 0
     group by pitcher) e
@@ -49,36 +49,36 @@ from (select pitcher,
          left outer join (
     select pitcher,
            sum(cost)  this_month_cost
-    from dw_daily_channel_cost
+    from game_data.dw_daily_channel_cost
     where dt >= '{du.get_n_pre_month_first_day(0)}'
     group by pitcher) r
                          on q.pitcher = r.pitcher
          left outer join(
     select pitcher,
            sum(cost)  last_month_cost
-    from dw_daily_channel_cost
+    from game_data.dw_daily_channel_cost
     where dt >= '{du.get_n_pre_month_first_day(1)}'
       and dt < '{du.get_n_pre_month_first_day(0)}'
     group by pitcher) t on q.pitcher = t.pitcher
          left outer join (
     select b.pitcher, sum(amount) last_month_far_amount
-    from order a
-             left outer join dw_daily_channel_cost b on a.channel = b.channel and a.date = b.dt
+    from game_data.order a
+             left outer join game_data.dw_daily_channel_cost b on a.channel = b.channel and a.date = b.dt
     where reg_time >= '{du.get_n_pre_month_first_day(1)} 00:00:00' and reg_time<'{du.get_n_pre_month_first_day(0)} 00:00:00' and status=2 
     group by pitcher
     ) y on q.pitcher = y.pitcher
 left outer join (
     select b.pitcher, sum(amount) last_month_amount
-    from order a
-             left outer join dw_daily_channel_cost b on a.channel = b.channel and a.date = b.dt
+    from game_data.order a
+             left outer join game_data.dw_daily_channel_cost b on a.channel = b.channel and a.date = b.dt
     where reg_time >= '{du.get_n_pre_month_first_day(1)} 00:00:00' and reg_time<'{du.get_n_pre_month_first_day(0)} 00:00:00' and status=2 
  and dt<'{du.get_n_pre_month_first_day(0)}'
     group by pitcher
 ) p on q.pitcher=p.pitcher
 left outer join (
     select b.pitcher, sum(amount) this_month_amount
-    from order  a
-             left outer join dw_daily_channel_cost b on a.channel = b.channel and a.date = b.dt
+    from game_data.order  a
+             left outer join game_data.dw_daily_channel_cost b on a.channel = b.channel and a.date = b.dt
     where reg_time >= '{du.get_n_pre_month_first_day(0)} 00:00:00' and status=2 
     group by pitcher
 ) g on q.pitcher=g.pitcher

+ 0 - 1
app/etl/dw/dw_book_trend.py

@@ -36,7 +36,6 @@ def book_trend():
     from dw_channel a 
     left join dw_channel_amount_daily b on a.dt=b.dt and a.channel=b.channel 
     left join dw_channel_amount_daily_reverse c on a.dt=c.dt and a.channel=c.channel 
-    where a.book!=''
     GROUP BY dt,book,type"""
 
 

+ 14 - 10
app/etl/dw/dw_channel_daily.py

@@ -16,7 +16,7 @@ def dw_channel():
     sql = """
 select
        dt1,channel1,pitcher,stage,platform,book,
-       if(type not in ('BYTEDANCE'),if(stage ='趣程15期' or stage ='趣程26期' or stage ='趣程30期','GDT','MP'),type )type,
+       if(stage ='趣程15期' or stage ='趣程26期' or stage ='趣程30期','GDT','MP'),
        order_count,order_user,order_amount,
        first_order_count,first_order_user,first_order_amount,
        view_count,click_count,follow_user,
@@ -34,11 +34,14 @@ from (
         follow_user,web_view_count,platform_view_count,web_order_count,type,
         require_roi,require_mult,order_count,order_user,order_amount from
 
-        (select * from dw_daily_platform_cost) aa
+        (select * from game_data.dw_daily_platform_cost) aa
         full outer  join
         (select date as dt4,channel as channel4,count(1) as order_count,   ---账面充值
         count(distinct user_id) as order_user,sum(amount) as order_amount
-        from order where status=2    group by date,channel) dd
+        from game_data.order 
+        where status=2    
+        and channel in (select DISTINCT (channel) from game_data.dw_daily_channel_cost ddcc)
+        group by date,channel) dd
         on dt=dt4 and channel=channel4) a
 
 left outer join (
@@ -48,7 +51,7 @@ left outer join (
            sum(if(user_order_count>=5,1,0)) reg_order_user_again5,
            sum(if(user_order_count>=6,1,0)) reg_order_user_again6
     from (select formatDateTime(reg_time,'%Y-%m-%d') reg_date,channel,count(1) user_order_count
-    from order where status=2  group by formatDateTime(reg_time,'%Y-%m-%d') ,user_id,channel) x group by reg_date,channel
+    from game_data.order where status=2  group by formatDateTime(reg_time,'%Y-%m-%d') ,user_id,channel) x group by reg_date,channel
     ) f on dt1=dt6 and channel1=channel6
 
 left outer join
@@ -58,12 +61,12 @@ left outer join
    sum(amount) as reg_order_amount,
    count(distinct user_id) as reg_order_user,
    count(1) as reg_order_count
-   from order where status=2 and reg_time>'2019-03-18 00:00:00' group by toDate(formatDateTime(reg_time,'%Y-%m-%d')),channel) b
+   from game_data.order where status=2 and reg_time>'2019-03-18 00:00:00' group by toDate(formatDateTime(reg_time,'%Y-%m-%d')),channel) b
     on dt1=dt2 and channel1=channel2
 left outer join
      (select date as dt3,channel as channel3,count(1) as first_order_count,          ---新用户首日充值
      count(distinct user_id) as first_order_user,sum(amount) as first_order_amount
-    from order where status=2 and toDate(reg_time)=date  group by date,channel) c
+    from game_data.order where status=2 and toDate(reg_time)=date  group by date,channel) c
     on dt1=dt3 and channel1=channel3    
 
     having order_amount+cost+reg_order_amount>0"""
@@ -107,7 +110,7 @@ select toDate(formatDateTime(reg_time,'%Y-%m-%d')) as dt,
            count(distinct if(subtractDays(date, 28)>=toDate(reg_time),null,user_id)) dc28,
            count(distinct if(subtractDays(date, 29)>=toDate(reg_time),null,user_id)) dc29,
            count(distinct if(subtractDays(date, 30)>=toDate(reg_time),null,user_id)) dc30
-from order where status=2 and reg_time>'2019-03-18 00:00:00' group by toDate(formatDateTime(reg_time,'%Y-%m-%d')),channel"""
+from game_data.order where status=2 and reg_time>'2019-03-18 00:00:00' group by toDate(formatDateTime(reg_time,'%Y-%m-%d')),channel"""
 
     data = ck.execute(sql)
     isql = "insert into dw_channel_user_daily values " \
@@ -185,7 +188,7 @@ def dw_channel_amount_daily():
            if(dt<subtractDays(today(), 88),sum(if(subtractDays(date, 90)>=toDate(reg_time),0,amount)),null) as dm3,
            if(dt<subtractDays(today(), 118),sum(if(subtractDays(date, 120)>=toDate(reg_time),0,amount)),null) as dm4,
            if(dt<subtractDays(today(), 148),sum(if(subtractDays(date, 150)>=toDate(reg_time),0,amount)),null) as dm5
- from order where status=2 and reg_time>'2019-03-18 00:00:00' group by toDate(formatDateTime(reg_time,'%Y-%m-%d')),channel  """
+ from game_data.order where status=2 and reg_time>'2019-03-18 00:00:00' group by toDate(formatDateTime(reg_time,'%Y-%m-%d')),channel  """
     data = ck.execute(sql)
     isql = "insert into dw_channel_amount_daily values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s," \
            "%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
@@ -204,7 +207,7 @@ def dw_channel_amount_daily_reverse():
                 sum(if(addDays(date,4)=today(),amount,0)) ba5,
                 sum(if(addDays(date,6)=today(),amount,0)) ba6,
                 sum(if(addDays(date,7)=today(),amount,0)) ba7
-     from order where status=2 and reg_time>'2019-03-18 00:00:00' and date>=subtractDays(today(),7) group by toDate(formatDateTime(reg_time,'%Y-%m-%d')),channel"""
+     from game_data.order where status=2 and reg_time>'2019-03-18 00:00:00' and date>=subtractDays(today(),7) group by toDate(formatDateTime(reg_time,'%Y-%m-%d')),channel"""
     data = ck.execute(sql)
     isql = "replace into dw_channel_amount_daily_reverse values (%s,%s,%s,%s,%s,%s,%s,%s,%s)"
     db.dm.execute("truncate table dw_channel_amount_daily_reverse")
@@ -243,10 +246,11 @@ def dw_channel_daily():
 
 
 if __name__ == '__main__':
+    dw_channel_daily()
     # dw_daily_channel()
     # dw_channel()
     # dw_channel_amount_daily()
     # dw_channel_user_daily()
     # dw_channel_amount_daily_reverse()
-    dw_channel()
+    # dw_channel()
     # del_channel()

+ 6 - 4
app/etl/sync_to_ck_task.py

@@ -32,8 +32,8 @@ def order_sync_ck(ymd):
         data1.append(tuple(li))
 
     # 删除分区
-    ck.execute("alter table order drop  partition '{}' ".format(ymd))
-    ck.insertMany("order", col, tuple(data1))
+    ck.execute("alter table game_data.order drop  partition '{}' ".format(ymd))
+    ck.insertMany("game_data.order", col, tuple(data1))
 
 
 # 广告计划
@@ -74,8 +74,10 @@ def campaign_vx():
 if __name__ == '__main__':
     # order_sync_ck('2021-06-03')
     # dw_order_channel_cost_sync_ck(dt.get_n_days(-2))
-    for i in dt.getDateLists('2021-09-29','2021-10-06'):
+    for i in dt.getDateLists('2021-03-19','2021-10-26'):
     # #     order(i)
     #     dw_order_channel_sync_ck(i)
         print(i)
-        order_sync_ck(i)
+        order_sync_ck(i)
+
+

+ 13 - 7
config/db_config.yaml

@@ -1,9 +1,15 @@
 quchen_text:
-  host: rm-bp1c9cj79872tx3aaro.mysql.rds.aliyuncs.com
-  user: superc
+  host: rm-bp11m4nyb4z61fy342o.mysql.rds.aliyuncs.com
+  user: qc
   passwd: Qc_1234567
   db: quchen_text
 
+db_mp:
+  host: rm-bp11m4nyb4z61fy342o.mysql.rds.aliyuncs.com
+  user: qc
+  passwd: Qc_1234567
+  db: db_mp
+
 zx:
   host: rm-bp145mi6r24ik50z5xo.mysql.rds.aliyuncs.com
   user: zx_manager
@@ -23,8 +29,8 @@ zx_platform:
   db: zx_platform
 
 dm:
-  host: rm-bp10mvfktc9o41ir91o.mysql.rds.aliyuncs.com
-  user: qucheng
+  host: rm-bp11m4nyb4z61fy342o.mysql.rds.aliyuncs.com
+  user: qc
   passwd: Qc_1234567
   db: dm
 
@@ -43,9 +49,9 @@ zx_test:
 
 
 clickhouse:
-  host: cc-bp1h3yc7o3g3o7k64o.ads.aliyuncs.com
-  user: qucheng_ck
-  passwd: Qc123456
+  host: cc-bp11803zbt0oq045io.ads.rds.aliyuncs.com
+  user: qc
+  passwd: Qc_123456
   port: 3306
 
 #clickhouse:

+ 10 - 0
model/DataBaseUtils.py

@@ -29,6 +29,16 @@ class MysqlUtils:
     def __init__(self):
         self.config = db_config()
 
+    @property
+    def db_mp(self):
+
+        conf = self.config['quchen_text']
+        self._quchen_text = MysqlOperation(host=conf['host'],
+                                           user=conf['user'],
+                                           passwd=conf['passwd'],
+                                           db=conf['db'])
+        return self._quchen_text
+
     @property
     def quchen_text(self):