Jelajahi Sumber

ADD:添加订单数据,消耗数据埋点

cxyu 3 tahun lalu
induk
melakukan
77eda69cf6

+ 32 - 16
app/api_data/platform_order/get_order.py

@@ -7,12 +7,11 @@ from app.api_data.platform_order.youshuge import get_youshuge_order_task
 from app.api_data.platform_order.yuewen import get_yuewen_order_task
 from app.api_data.platform_order.yangguang import yangguang
 from app.api_data.platform_order.QiYue import get_qiyue_order_task
+import logging
 
 db = MysqlUtils()
 
 
-
-
 def get_new_account(plactform):
     data = db.quchen_text.getData(
         f"select text from order_account_text where platform='{plactform}' and create_time>='{du.get_n_days(-1)}'")
@@ -22,38 +21,40 @@ def get_new_account(plactform):
     return new_data
 
 
-
-
 @DingTalkDecorators("花生")
 def huasheng(start=None, end=None):
+    logging.info('花生订单数据拉取,开始')
     if start is None:
         start = end = du.getNow()
     accounts = get_account("花生")
     if len(accounts) == 0:
         return
     else:
-        print(f'花生有账号{len(accounts)}个')
+        logging.info(f'花生有账号{len(accounts)}个')
     # 花生有请求限制 不用多线程
     for account in accounts:
         get_hs_order_task(start, end, account)
-
+    logging.info('花生订单数据拉取,结束')
 
 
 @DingTalkDecorators("七悦")
 def qiyue(start=None, end=None, new=None):
+    logging.info('七悦订单数据拉取,开始')
     if start is None:
         start = end = du.getNow()
     accounts = get_account("七悦")
     if len(accounts) == 0:
         return
     else:
-        print(f'七悦有账号{len(accounts)}个')
+        logging.info(f'七悦有账号{len(accounts)}个')
     for account in accounts:
         get_qiyue_order_task(start, end, account)
+    logging.info('七悦订单数据拉取,结束')
 
 
 @DingTalkDecorators("文鼎")
 def wending(start=None, end=None, new=None):
+    logging.info('文鼎订单数据拉取,开始')
     if start is None:
         start = end = du.getNow()
 
@@ -61,12 +62,16 @@ def wending(start=None, end=None, new=None):
     if len(accounts) == 0:
         return
     else:
-        print(f'文鼎有账号{len(accounts)}个')
+        logging.info(f'文鼎有账号{len(accounts)}个')
     for account in accounts:
         get_wd_order_task(start, end, account)
+    logging.info('文鼎订单数据拉取,结束')
+
 
 @DingTalkDecorators("国风")
 def guofeng(start=None, end=None, new=None):
+    start = start if start > '2021-07-08' else '2021-07-08'
+    logging.info('国风订单数据拉取,开始')
     if start is None:
         start = end = du.getNow()
 
@@ -74,12 +79,15 @@ def guofeng(start=None, end=None, new=None):
     if len(accounts) == 0:
         return
     else:
-        print(f'国风有账号{len(accounts)}个')
+        logging.info(f'国风有账号{len(accounts)}个')
     for account in accounts:
         get_gf_order_task(start, end, account)
+    logging.info('国风订单数据拉取,结束')
+
 
 @DingTalkDecorators("掌读")
 def zhangdu(start=None, end=None, new=None):
+    logging.info('掌读订单数据拉取,开始')
     if start is None:
         start = end = du.getNow()
 
@@ -87,13 +95,15 @@ def zhangdu(start=None, end=None, new=None):
     if len(accounts) == 0:
         return
     else:
-        print(f'掌读有账号{len(accounts)}个')
+        logging.info(f'掌读有账号{len(accounts)}个')
     for account in accounts:
         get_zd_order_task(start, end, account)
+    logging.info('掌读订单数据拉取,结束')
 
 
 @DingTalkDecorators("掌中云")
 def zhangzhongyun(start=None, end=None, new=None):
+    logging.info('掌中云订单数据拉取,开始')
     if start is None:
         start = du.getNow()
         end = du.get_n_days(1)
@@ -103,14 +113,16 @@ def zhangzhongyun(start=None, end=None, new=None):
     if len(accounts) == 0:
         return
     else:
-        print(f'掌中云有账号{len(accounts)}个')
+        logging.info(f'掌中云有账号{len(accounts)}个')
     for account in accounts:
         executor.submit(get_zzy_order_task, start, end, account)
     executor.shutdown()
+    logging.info('掌中云订单数据拉取,结束')
 
 
 @DingTalkDecorators("阅文")
 def yueweng(start=None, end=None):
+    logging.info('阅文订单数据拉取,开始')
     if start is None:
         start = end = du.getNow()
 
@@ -119,27 +131,31 @@ def yueweng(start=None, end=None):
     if len(accounts) == 0:
         return
     else:
-        print(f'阅文有账号{len(accounts)}个')
+        logging.info(f'阅文有账号{len(accounts)}个')
     for account in accounts:
         executor.submit(get_yuewen_order_task, start, end, account)
     executor.shutdown()
+    logging.info('阅文订单数据拉取,结束')
 
 
 @DingTalkDecorators("七悦有声")
 def qiyueyousheng(start=None, end=None):
+    logging.info('七悦有声订单数据拉取,开始')
     if start is None:
         start = end = du.getNow()
     accounts = get_account("七悦有声")
     if len(accounts) == 0:
         return
     else:
-        print(f'七悦有声有账号{len(accounts)}个')
+        logging.info(f'七悦有声有账号{len(accounts)}个')
     for account in accounts:
         AudioQiyue().get_order(start, end, account)
+    logging.info('七悦有声订单数据拉取,结束')
 
 
 @DingTalkDecorators("悠书阁")
 def youshuge(start=None, end=None):
+    logging.info('悠书阁订单数据拉取,开始')
     if start is None:
         start = end = du.getNow()
 
@@ -148,10 +164,11 @@ def youshuge(start=None, end=None):
     if len(accounts) == 0:
         return
     else:
-        print(f'悠书阁有账号{len(accounts)}个')
+        logging.info(f'悠书阁有账号{len(accounts)}个')
     for account in accounts:
         executor.submit(get_youshuge_order_task, start, end, account)
     executor.shutdown()
+    logging.info('悠书阁订单数据拉取,结束')
 
 
 def yestoday():
@@ -172,5 +189,4 @@ if __name__ == '__main__':
     # yangguang()
     # huasheng("2021-04-12",'2021-05-10')
     # yueweng("2021-05-11", '2021-05-11')
-    qiyue('2020-11-01','2021-06-03')
-
+    qiyue('2020-11-01', '2021-06-03')

+ 2 - 2
app/api_data/platform_order/order_util.py

@@ -83,7 +83,7 @@ def get_huasheng_order(start,end, account, merchant):
             response_result_json = r.json()
             if response_result_json['code']!=0:
                 print(response_result_json)
-                DingTalkUtils.send('花生订单接口异常'+r.text)
+                DingTalkUtils().send('花生订单接口异常'+r.text)
 
 
             if 'data' not in response_result_json.keys():
@@ -359,7 +359,7 @@ def get_gf_order_task(start,end,account):
                 page += 1
     print(f"{stage} [{start}~{end}] 有订单 {order_list.__len__()}")
     if order_list.__len__()>0:
-        # print(order_list)
+        print(order_list)
         save_order(order_list)
 
 

+ 1 - 1
app/api_data/platform_order/yangguang.py

@@ -183,7 +183,7 @@ def check():
         time.sleep(60)
         x += 1
         if x > 10:
-            DingTalkUtils.send('阳光订单回调延时10min', '15168342316')
+            DingTalkUtils().send('阳光订单回调延时10min', '15168342316')
             info = '回调未完成'
             break
     return info

+ 1 - 1
app/api_data/platform_order/yuewen.py

@@ -72,7 +72,7 @@ def get_yuewen_order_task(st, et, account):
 
             if not response_result_json.get('data'):
                 print(response_result_json)
-                DingTalkUtils.send('阅文订单拉取失败')
+                DingTalkUtils().send('阅文订单拉取失败')
 
 
             response_data = response_result_json['data']

+ 1 - 1
app/api_data/tx_ad_cost/get_cost.py

@@ -144,7 +144,7 @@ def run(dt):
 		logging.info('获取视频信息,结束')
 
 	except:
-		DingTalkUtils.send("拉取广告数据出错")
+		DingTalkUtils().send("拉取广告数据出错")
 
 
 def day():

+ 64 - 66
app/api_data/tx_ad_cost/get_cost_older.py

@@ -1,27 +1,25 @@
-import sys
 import requests
 import hashlib
 import time
 import json
-import pymysql
 import logging
 import random
 from concurrent.futures import ThreadPoolExecutor
-from datetime import datetime
 from model.DateUtils import DateUtils
 from model.DataBaseUtils import MysqlUtils
-db = MysqlUtils()
 from model.DingTalkUtils import DingTalkUtils
-# logging.getLogger().setLevel(logging.WARNING)
+
+db = MysqlUtils()
 du = DateUtils()
 
+
 def md5value(s):
-	md5 = hashlib.md5()
-	md5.update(s.encode("utf-8"))
-	return md5.hexdigest()
+    md5 = hashlib.md5()
+    md5.update(s.encode("utf-8"))
+    return md5.hexdigest()
 
 
-def daily_reports_get(access_token, account_id, st, et,level, fields):
+def daily_reports_get(access_token, account_id, st, et, level, fields):
     interface = 'daily_reports/get'
     url = 'https://api.e.qq.com/v1.1/' + interface
 
@@ -51,86 +49,83 @@ def daily_reports_get(access_token, account_id, st, et,level, fields):
 
     r = requests.get(url, params=parameters).json()
     if r['code'] != 0:
-        print(r['code'], r['message'])
-        DingTalkUtils.send('消耗日报请求超频')
+        logging.warning(str(r['code']) + str(r['message']))
+        DingTalkUtils().send('消耗日报请求超频' + '\n' + str(r['code']) + str(r['message']))
 
     return r
 
 
-def get_q_data(y,li,st,et):
-    c = daily_reports_get(y[2], y[0],st,et, "REPORT_LEVEL_ADVERTISER", (
+def get_q_data(y, li, st, et):
+    c = daily_reports_get(y[2], y[0], st, et, "REPORT_LEVEL_ADVERTISER", (
         'date', 'view_count', 'valid_click_count', 'ctr', 'cpc', 'cost', 'web_order_count', 'web_order_rate',
         'web_order_cost', 'follow_count', 'order_amount', 'order_roi', 'platform_page_view_count',
         'web_commodity_page_view_count', 'from_follow_uv'))
-    # print(c)
     if 'data' in c.keys() and len(c["data"]["list"]) > 0:
         for d in c['data']['list']:
             d['account_id'] = y[0]
-            print(d['account_id'], d["cost"])
+            logging.info('qq: '+str(d['account_id'])+ str(d["cost"]))
             x = tuple(d.values())
             li.append(x)
 
-def get_v_data(y,li,st,et):
-    c = daily_reports_get(y[2], y[0],st,et, "REPORT_LEVEL_ADVERTISER_WECHAT", (
+
+def get_v_data(y, li, st, et):
+    c = daily_reports_get(y[2], y[0], st, et, "REPORT_LEVEL_ADVERTISER_WECHAT", (
         'date', 'cost', 'view_count', 'valid_click_count', 'ctr', 'official_account_follow_rate', 'order_amount',
         'order_roi', 'order_count', 'order_rate', 'order_unit_price', 'web_order_cost', 'first_day_order_amount',
         'first_day_order_count'))
-    # print(c)
 
-    if 'data' in c.keys() and len(c["data"]["list"])>0:
+    if 'data' in c.keys() and len(c["data"]["list"]) > 0:
         for d in c['data']['list']:
             d['account_id'] = y[0]
-            print(d['account_id'], d["cost"])
+            logging.info('vx:'+str(d['account_id'])+ str(d["cost"]))
             x = tuple(d.values())
             li.append(x)
 
 
-
-
 def get_vx_list():
-	sql="select account_id,wechat_account_id,access_token,refresh_token,name," \
-		"ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from advertiser_vx"
-	a= db.quchen_text.getData(sql)
-	return a
+    sql = "select account_id,wechat_account_id,access_token,refresh_token,name," \
+          "ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from advertiser_vx"
+    a = db.quchen_text.getData(sql)
+    return a
+
 
 def get_qq_list():
-	sql = "select account_id,'',access_token,refresh_token,name," \
-		  "ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from advertiser_qq"
-	a = db.quchen_text.getData(sql)
-	return a
+    sql = "select account_id,'',access_token,refresh_token,name," \
+          "ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from advertiser_qq"
+    a = db.quchen_text.getData(sql)
+    return a
 
 
-def get_daily_vx(st,et):
-	token_list_v = get_vx_list()
-	print("获取vx账号:",token_list_v.__len__())
+def get_daily_vx(st, et):
+    token_list_v = get_vx_list()
+    logging.info("获取vx账号:" + str(token_list_v.__len__()))
 
-	time1 = time.time()
-	executor = ThreadPoolExecutor(max_workers=10)
-	li=[]
-	for y in token_list_v:
-		executor.submit(get_v_data,y,li,st,et)
-	executor.shutdown()
-	print('get_daily_vx:', len(li), 'cost:', int(time.time()-time1))
+    time1 = time.time()
+    executor = ThreadPoolExecutor(max_workers=10)
+    li = []
+    for y in token_list_v:
+        executor.submit(get_v_data, y, li, st, et)
+    executor.shutdown()
+    logging.info('get_daily_vx:' + str(len(li)) + 'cost:' + str(int(time.time() - time1)))
 
-	mysql_insert_daily_vx(li)
+    mysql_insert_daily_vx(li)
 
-def get_daily_qq(st,et):
-	token_list_q =  get_qq_list()
-	print("获取qq账号:",token_list_q.__len__())
-	time1 = time.time()
-	li=[]
-	executor = ThreadPoolExecutor(max_workers=10)
-	for x in token_list_q:
 
-		executor.submit(get_q_data,x,li,st,et)
-	executor.shutdown()
-	print('get_qq_order:', len(li), 'cost:', int(time.time()-time1))
-	mysql_insert_daily_qq(li)
+def get_daily_qq(st, et):
+    token_list_q = get_qq_list()
+    logging.info("获取qq账号:" + str(token_list_q.__len__()))
+    time1 = time.time()
+    li = []
+    executor = ThreadPoolExecutor(max_workers=10)
+    for x in token_list_q:
+        executor.submit(get_q_data, x, li, st, et)
+    executor.shutdown()
+    logging.info('get_qq_order:' + str(len(li)) + 'cost:' + str(int(time.time() - time1)))
+    mysql_insert_daily_qq(li)
 
 
 def mysql_insert_daily_vx(data):
-
-    b ="""replace into daily_vx (date,cost,view_count,valid_click_count,ctr,official_account_follow_rate,order_amount,
+    b = """replace into daily_vx (date,cost,view_count,valid_click_count,ctr,official_account_follow_rate,order_amount,
 	order_roi,order_count,order_rate,order_unit_price,web_order_cost,first_day_order_amount,first_day_order_count,account_id)
 	 values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
     db.quchen_text.executeMany(b, data)
@@ -140,32 +135,35 @@ def mysql_insert_daily_qq(data):
     a = """replace into daily_qq (date,view_count,valid_click_count,ctr,cpc,cost,web_order_count,web_order_rate,
 	web_order_cost,follow_count,order_amount,order_roi,platform_page_view_count,web_commodity_page_view_count,
 	from_follow_uv,account_id) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
-    db.quchen_text.executeMany(a,data)
+    db.quchen_text.executeMany(a, data)
 
 
-def run(st,et):
-	get_daily_vx(st,et)
-	get_daily_qq(st,et)
+def run(st, et):
+    logging.info('微信消耗数据拉取,开始')
+    get_daily_vx(st, et)
+    logging.info('微信消耗数据拉取,结束')
+    logging.info('qq消耗数据拉取,开始')
+    get_daily_qq(st, et)
+    logging.info('qq消耗数据拉取,结束')
+
 
 def old_cost_hourly():
-    st = et =du.getNow()
+    st = et = du.getNow()
+    logging.info('消耗数据拉取,开始')
     run(st, et)
+    logging.info('消耗数据拉取,结束')
+
 
 def old_cost_daily():
     st = du.get_n_days(-10)
     et = du.get_n_days(-1)
     run(st, et)
 
-if __name__ == '__main__':
 
+if __name__ == '__main__':
     # run()
     # old_cost_daily()
     st = du.get_n_days(-6)
     et = du.get_n_days(-1)
-    print(st,et)
+    print(st, et)
     run(st, et)
-
-
-
-
-

+ 1 - 1
app/crontab_task/ad_hourly.py

@@ -41,7 +41,7 @@ if __name__ == '__main__':
             break
 
     if int(time.time()-st)>1500:
-        DingTalkUtils.send(f"小时任务耗时{int(time.time()-st)}秒",phone="15168342316")
+        DingTalkUtils().send(f"小时任务耗时{int(time.time()-st)}秒",phone="15168342316")
 
     logging.info('广告素材任务,结束')
 

+ 21 - 5
app/crontab_task/hourlyRun.py

@@ -1,24 +1,40 @@
 from app.etl import data_stat_run
 from model.DateUtils import DateUtils
 from app.crontab_task import task
-
-du = DateUtils()
-import time
 from model.DingTalkUtils import DingTalkUtils
+from logging import handlers
+import time
+import logging
 
 
 if __name__ == '__main__':
-
+    logging.basicConfig(
+        handlers=[
+            logging.handlers.RotatingFileHandler('./hourlyRun.log',
+                                                 maxBytes=10 * 1024 * 1024,
+                                                 backupCount=5,
+                                                 encoding='utf-8')
+            , logging.StreamHandler()  # 供输出使用
+        ],
+        level=logging.INFO,
+        format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s"
+    )
+    du = DateUtils()
     st = time.time()
 
+    logging.info('订单与消耗数据拉取,开始')
     # 订单数据和消耗数据拉取
     task.hourly()
+    logging.info('订单与消耗数据拉取,结束')
 
+    logging.info('订单与消耗数据处理,开始')
     # 数据处理
     data_stat_run.hourly()
+    logging.info('订单与消耗数据处理,结束')
+
 
     if int(time.time()-st)>1500:
-        DingTalkUtils.send(f"小时任务耗时{int(time.time()-st)}秒",phone="15168342316")
+        DingTalkUtils().send(f"小时任务耗时{int(time.time()-st)}秒",phone="15168342316")
 
 
 

+ 28 - 19
app/crontab_task/task.py

@@ -1,11 +1,12 @@
-
 import threading
 from app.api_data.platform_order.get_order import *
-from app.api_data.tx_ad_cost.get_cost_older import old_cost_hourly,old_cost_daily
+from app.api_data.tx_ad_cost.get_cost_older import old_cost_hourly, old_cost_daily
 from app.etl.sync_to_ck_task import order_sync_ck
 from app.api_data.platform_order import yangguang
 from app.api_data.tx_ad_cost import get_cost_older
 from app.etl.data_stat_run import do_cost
+
+
 def hourly():
     t1 = threading.Thread(target=huasheng)
     t2 = threading.Thread(target=qiyue)
@@ -16,28 +17,37 @@ def hourly():
     t7 = threading.Thread(target=yueweng)
     t8 = threading.Thread(target=yangguang.yangguang)
     t9 = threading.Thread(target=youshuge)
-    t10 = threading.Thread(target=old_cost_hourly)
-    t11 = threading.Thread(target=guofeng)
+    t10 = threading.Thread(target=guofeng)
+    t11 = threading.Thread(target=old_cost_hourly)
     t1.start()
     t1.join()
     t2.start()
     t2.join()
+
     t3.start()
     t3.join()
+
     t4.start()
     t4.join()
+
     t5.start()
     t5.join()
+
     t6.start()
     t6.join()
+
     t7.start()
     t7.join()
+
     t8.start()
     t8.join()
+
     t9.start()
     t9.join()
+
     t10.start()
     t10.join()
+
     t11.start()
     t11.join()
 
@@ -46,18 +56,17 @@ def daily():
     st = du.get_n_days(-10)
     et = du.get_n_days(-1)
 
-
-    t1 = threading.Thread(target=huasheng,args=(st,et))
-    t2 = threading.Thread(target=qiyue,args=(st,et))
-    t3 = threading.Thread(target=qiyueyousheng,args=(st,et))
-    t4 = threading.Thread(target=wending,args=(st,et))
-    t5 = threading.Thread(target=zhangdu,args=(st,et))
-    t6 = threading.Thread(target=zhangzhongyun,args=(st,et))
-    t7 = threading.Thread(target=yueweng,args=(st,et))
-    t8 = threading.Thread(target=yangguang,args=(st,et))
-    t9 = threading.Thread(target=youshuge,args=(st,et))
-    t10 = threading.Thread(target=old_cost_daily,args=(st,et))
-    t11 = threading.Thread(target=wending,args=(st,et))
+    t1 = threading.Thread(target=huasheng, args=(st, et))
+    t2 = threading.Thread(target=qiyue, args=(st, et))
+    t3 = threading.Thread(target=qiyueyousheng, args=(st, et))
+    t4 = threading.Thread(target=wending, args=(st, et))
+    t5 = threading.Thread(target=zhangdu, args=(st, et))
+    t6 = threading.Thread(target=zhangzhongyun, args=(st, et))
+    t7 = threading.Thread(target=yueweng, args=(st, et))
+    t8 = threading.Thread(target=yangguang, args=(st, et))
+    t9 = threading.Thread(target=youshuge, args=(st, et))
+    t10 = threading.Thread(target=guofeng, args=(st, et))
+    t11 = threading.Thread(target=old_cost_daily, args=(st, et))
     t1.start()
     t1.join()
     t2.start()
@@ -86,8 +95,8 @@ def daily():
 
 def cost_yestoday_repair():
     dt = du.get_n_days(-1)
-    get_cost_older.run(dt,dt)
-    do_cost(dt,dt)
+    get_cost_older.run(dt, dt)
+    do_cost(dt, dt)
 
 
 def yueweng_order_repair():
@@ -98,4 +107,4 @@ def yueweng_order_repair():
 
 if __name__ == '__main__':
     # hourly()
-    cost_yestoday_repair()
+    cost_yestoday_repair()

+ 11 - 4
app/etl/data_stat_run.py

@@ -12,22 +12,29 @@ from app.etl.src import book_annual_expect_profit
 
 du = DateUtils()
 logger = logging.getLogger("")
+
+
 # logger.setLevel(logging.ERROR)
 # logging.getLogger().setLevel(logging.WARNING)
 
 
 def do_order(st, et):
+    logging.info('订单数据同步到ck,开始')
     for i in du.getDateLists(st, et):
-        print(i)
+        logging.info('订单:' + str(i))
         order_sync_ck(i)
+    logging.info('订单数据同步到ck,结束')
+
 
 
 def do_cost(st, et):
+    logging.info('消耗数据处理,开始')
     for i in du.getDateLists(st, et):
-        print(i)
+        logging.info("消耗:" + str(i))
         channel_by_account_daily(i)
         channel_info_daily(i)
         dw_daily_channel_cost(i)
+    logging.info('消耗数据处理,结束')
 
 
 def main(st, et):
@@ -41,8 +48,8 @@ def main(st, et):
         book_trend()
         dm_pitcher_daily_overview()
     except Exception as e:
-        print(e)
-        DingTalkUtils.send("hourlyRun fail!! " + str(e), '15168342316')
+        logging.error(e)
+        DingTalkUtils().send("hourlyRun fail!! " + str(e), '15168342316')
 
 
 def hourly():

+ 51 - 53
app/etl/data_stat_task.py

@@ -1,14 +1,15 @@
-from model.DataBaseUtils import MysqlUtils,CkUtils
+from model.DataBaseUtils import MysqlUtils, CkUtils
 from model.DateUtils import DateUtils
+import logging
+
 db = MysqlUtils()
 ck = CkUtils()
 dt = DateUtils()
-from datetime import datetime
 
 
 def dw_daily_channel_cost(ymd):
-    print("run> dw_daily_channel_cost")
-    sql="""
+    logging.info("run> dw_daily_channel_cost")
+    sql = """
         select x.dt,x.channel,pitcher,stage,x.platform,x.book,
                ifnull(view_count,0),ifnull(click_count,0),
                ifnull(follow_user,0),ifnull(cost,0)/100 as cost,
@@ -43,28 +44,29 @@ def dw_daily_channel_cost(ymd):
         
         
         """.format(ymd)
-    data=db.quchen_text.get_data_list(sql)
+    data = db.quchen_text.get_data_list(sql)
     data1 = []
-    col="dt,channel,pitcher,stage,platform,book,view_count,click_count,follow_user,cost,web_view_count,platform_view_count,web_order_count,type,require_roi,require_mult"
+    col = "dt,channel,pitcher,stage,platform,book,view_count,click_count,follow_user,cost,web_view_count,platform_view_count,web_order_count,type,require_roi,require_mult"
     for i in data:
         i[0] = str(i[0])
         i[9] = str(i[9])
-        i[6]=float(i[6])
-        i[7]=float(i[7])
-        i[8]=float(i[8])
-        i[9]=float(i[9])
-        i[10]=float(i[10])
-        i[11]=float(i[11])
-        i[12]=float(i[12])
+        i[6] = float(i[6])
+        i[7] = float(i[7])
+        i[8] = float(i[8])
+        i[9] = float(i[9])
+        i[10] = float(i[10])
+        i[11] = float(i[11])
+        i[12] = float(i[12])
         data1.append(tuple(i))
     ck.execute(f"alter table dw_daily_channel_cost drop  partition '{ymd}' ")
-    print(len(data1))
+    logging.info(len(data1))
     ck.insertMany("dw_daily_channel_cost", col, tuple(data1))
 
+
 def channel_by_account_daily(ymd):
     """返回当天消耗账户对应的公众号表"""
-    print("run> channel_by_account_daily")
-    sql="""replace into channel_by_account_daily 
+    logging.info("run> channel_by_account_daily")
+    sql = """replace into channel_by_account_daily 
             select  '{0}' as dt,a.account_id as account_id, ifnull(ifnull(b.name,a.name),'') as channel,type  from
             (select  account_id,name,'GDT' type  from advertiser_qq 
              union 
@@ -73,22 +75,19 @@ def channel_by_account_daily(ymd):
                 left join
             (select b.account_id,b.name from
             (select min(end_time) as end_time,account_id from account_change where end_time>'{0}' GROUP BY account_id) a
-            left join account_change  b on  a.end_time=b.end_time and a.account_id=b.account_id) b on a.account_id=b.account_id""".format(ymd)
-    # print(sql)
+            left join account_change  b on  a.end_time=b.end_time and a.account_id=b.account_id) b on a.account_id=b.account_id""".format(
+        ymd)
     db.quchen_text.execute(sql)
 
 
-
-
-
 def channel_info_daily(ymd):
     """获取公众号某天的期数,投手,平台,书籍
     @ return [[]]
     """
     # 获取现在的全量公众号信息
-    print("run> channel_info_daily")
+    logging.info("run> channel_info_daily")
 
-    sql="""select '{}' as dt,a.name ,ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from (
+    sql = """select '{}' as dt,a.name ,ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from (
             select  name from advertiser_vx  where name is not null group by name--  公众号全量表
             union
             select  name from advertiser_qq where name is not null group by name
@@ -105,20 +104,22 @@ def channel_info_daily(ymd):
                 select name,ifnull(stage,'') stage,ifnull(pitcher,'') pitcher,ifnull(platform,'') platform,ifnull(book,'') book from advertiser_vx where name is not null and name !=''
                 ) b on a.name=b.name
                 """.format(ymd)
-    data=db.quchen_text.get_data_list(sql)
-    pitcher_change=db.quchen_text.getData(
-                                          "select b.channel as channel,pitcher from "
-                                          "(select max(start_time) as start_time,channel from pitcher_change "
-                                     " where start_time<='{}' GROUP BY channel) a"
-                                     " left join pitcher_change  b on  a.start_time=b.start_time and a.channel=b.channel".format(ymd))
-
-    platform_change=db.quchen_text.getData("select b.name as channel,current_platform as platform from (select max(change_date) as change_date,name from platform_change "
-                                          "where change_date<='{}' GROUP BY name) a "
-                                          "left join platform_change  b on  a.change_date=b.change_date and a.name=b.name".format(ymd))
-
-    book_change=db.quchen_text.getData("select b.name as channel,book from (select max(start_time) as start_time,name from book_change "
-                                           "where start_time<='{}' GROUP BY name) a "
-                                           "left join book_change  b on  a.start_time=b.start_time and a.name=b.name".format(ymd))
+    data = db.quchen_text.get_data_list(sql)
+    pitcher_change = db.quchen_text.getData(
+        "select b.channel as channel,pitcher from "
+        "(select max(start_time) as start_time,channel from pitcher_change "
+        " where start_time<='{}' GROUP BY channel) a"
+        " left join pitcher_change  b on  a.start_time=b.start_time and a.channel=b.channel".format(ymd))
+
+    platform_change = db.quchen_text.getData(
+        "select b.name as channel,current_platform as platform from (select max(change_date) as change_date,name from platform_change "
+        "where change_date<='{}' GROUP BY name) a "
+        "left join platform_change  b on  a.change_date=b.change_date and a.name=b.name".format(ymd))
+
+    book_change = db.quchen_text.getData(
+        "select b.name as channel,book from (select max(start_time) as start_time,name from book_change "
+        "where start_time<='{}' GROUP BY name) a "
+        "left join book_change  b on  a.start_time=b.start_time and a.name=b.name".format(ymd))
 
     stage_change = db.quchen_text.getData(
         "select  channel,stage from (select max(start_date) as start_date,channel from stage_change "
@@ -127,29 +128,26 @@ def channel_info_daily(ymd):
 
     for i in data:
         for j in pitcher_change:
-            if i[1]==j[0]:
-                i[3]=j[1]
+            if i[1] == j[0]:
+                i[3] = j[1]
         for k in platform_change:
-            if i[1]==k[0]:
-
-                i[4]=k[1]
+            if i[1] == k[0]:
+                i[4] = k[1]
         for h in book_change:
-            if i[1]==h[0]:
-                i[5]=h[1]
+            if i[1] == h[0]:
+                i[5] = h[1]
 
         for m in stage_change:
-            if i[1] ==m[0]:
-                i[2]=m[1]
-
+            if i[1] == m[0]:
+                i[2] = m[1]
 
+    insert_sql = "replace into channel_info_daily values (%s,%s,%s,%s,%s,%s) "
 
-    insert_sql="replace into channel_info_daily values (%s,%s,%s,%s,%s,%s) "
-
-    db.quchen_text.executeMany(insert_sql,data)
+    db.quchen_text.executeMany(insert_sql, data)
 
 
 def ods_order(dt):
-    sql="""	replace into ods_order
+    sql = """	replace into ods_order
 			select 
 			case platform when '掌中云' then DATE_FORMAT(STR_TO_DATE(order_time,'%Y-%m-%dT%H:%i:%s'),'%Y-%m-%d') 
             when '掌读' then from_unixtime(order_time, '%Y-%m-%d') 
@@ -168,11 +166,11 @@ def ods_order(dt):
 
 def order_account_text():
     db.quchen_text.execute("truncate order_account_text")
-    with open('./wending_account_config.csv',encoding='utf-8') as f:
-
+    with open('./wending_account_config.csv', encoding='utf-8') as f:
         for i in f.readlines():
             db.quchen_text.execute("insert into order_account_text(platform,text) values ('文鼎','{}')".format(i))
 
+
 if __name__ == '__main__':
     # channel_info_daily('2021-02-06')
 

+ 14 - 8
app/etl/dm/dm_pitcher_daily_overview.py

@@ -3,14 +3,18 @@
 @auth ck
 """
 from model.DateUtils import DateUtils
-from model.DataBaseUtils import MysqlUtils,CkUtils
+from model.DataBaseUtils import MysqlUtils, CkUtils
+import logging
+
 du = DateUtils()
-db= MysqlUtils()
+db = MysqlUtils()
 ck = CkUtils()
 
+
 def dm_pitcher_daily_overview():
-    print("run> dm_pitcher_daily_overview")
-    sql=f"""
+    logging.info('投手累计数据处理,开始')
+    logging.info("run> dm_pitcher_daily_overview")
+    sql = f"""
 select 
         q.pitcher,
        channel_count,
@@ -79,9 +83,11 @@ left outer join (
     group by pitcher
 ) g on q.pitcher=g.pitcher
 """
-    data=ck.execute(sql)
-# print(data)
-    db.dm.executeMany("replace into dm_pitcher_daily_overview values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",data)
+    data = ck.execute(sql)
+    # print(data)
+    db.dm.executeMany("replace into dm_pitcher_daily_overview values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", data)
+    logging.info('投手累计数据处理,结束')
+
 
 if __name__ == '__main__':
-    dm_pitcher_daily_overview()
+    dm_pitcher_daily_overview()

+ 7 - 4
app/etl/dw/dw_book_trend.py

@@ -2,16 +2,18 @@
 @desc 书维度全量表
 @auth ck
 """
-import time
 from model.DateUtils import DateUtils
-from model.DataBaseUtils import MysqlUtils,CkUtils
+from model.DataBaseUtils import MysqlUtils, CkUtils
+import logging
+
 du = DateUtils()
-db= MysqlUtils()
+db = MysqlUtils()
 ck = CkUtils()
 
 
 def book_trend():
-    sql="""insert into book_trend
+    logging.info('书籍趋势数据处理,开始')
+    sql = """insert into book_trend
     select a.dt,book,type,
 sum(cost) cost,
 sum(order_count),
@@ -38,6 +40,7 @@ where a.book!=''
 GROUP BY dt,book,type"""
     db.dm.execute('truncate table book_trend')
     db.dm.execute(sql)
+    logging.info('书籍趋势数据处理,结束')
 
 
 if __name__ == '__main__':

+ 40 - 23
app/etl/dw/dw_channel_daily.py

@@ -3,16 +3,17 @@
 
 @auth ck
 """
-import time
 from model.DateUtils import DateUtils
-from model.DataBaseUtils import MysqlUtils,CkUtils
+from model.DataBaseUtils import MysqlUtils, CkUtils
+import logging
+
 du = DateUtils()
 db = MysqlUtils()
 ck = CkUtils()
 
 
 def dw_channel():
-    sql="""
+    sql = """
 select
        dt1,channel1,pitcher,stage,platform,book,if(stage ='趣程15期' or stage ='趣程26期' or stage ='趣程30期','GDT','MP') type,
        order_count,order_user,order_amount,
@@ -66,14 +67,14 @@ left outer join
 
     having order_amount+cost+reg_order_amount>0"""
 
-    data=ck.execute(sql)
-    isql="insert into dw_channel values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
+    data = ck.execute(sql)
+    isql = "insert into dw_channel values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
     db.dm.execute("truncate table dw_channel")
-    db.dm.executeMany(isql,data)
+    db.dm.executeMany(isql, data)
 
 
 def dw_channel_user_daily():
-    sql="""
+    sql = """
 select toDate(formatDateTime(reg_time,'%Y-%m-%d')) as dt,
             channel,
            count(distinct if(subtractDays(date, 1)>=toDate(reg_time),null,user_id)) dc1,
@@ -108,17 +109,17 @@ select toDate(formatDateTime(reg_time,'%Y-%m-%d')) as dt,
            count(distinct if(subtractDays(date, 30)>=toDate(reg_time),null,user_id)) dc30
 from order where status=2 and reg_time>'2019-03-18 00:00:00' group by toDate(formatDateTime(reg_time,'%Y-%m-%d')),channel"""
 
-    data =ck.execute(sql)
-    isql="insert into dw_channel_user_daily values " \
-         "(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s," \
-         "%s,%s,%s,%s,%s,%s,%s,%s,%s,%s," \
-         "%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
+    data = ck.execute(sql)
+    isql = "insert into dw_channel_user_daily values " \
+           "(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s," \
+           "%s,%s,%s,%s,%s,%s,%s,%s,%s,%s," \
+           "%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
     db.dm.execute("truncate table dw_channel_user_daily")
-    db.dm.executeMany(isql,data)
+    db.dm.executeMany(isql, data)
 
 
 def dw_channel_amount_daily():
-    sql="""
+    sql = """
   select toDate(formatDateTime(reg_time,'%Y-%m-%d')) as dt,
    channel as channel,
            sum(if(subtractDays(date, 1)>=toDate(reg_time),0,amount)) as da1,
@@ -185,11 +186,11 @@ def dw_channel_amount_daily():
            if(dt<subtractDays(today(), 118),sum(if(subtractDays(date, 120)>=toDate(reg_time),0,amount)),null) as dm4,
            if(dt<subtractDays(today(), 148),sum(if(subtractDays(date, 150)>=toDate(reg_time),0,amount)),null) as dm5
  from order where status=2 and reg_time>'2019-03-18 00:00:00' group by toDate(formatDateTime(reg_time,'%Y-%m-%d')),channel  """
-    data=ck.execute(sql)
-    isql="insert into dw_channel_amount_daily values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s," \
-         "%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
+    data = ck.execute(sql)
+    isql = "insert into dw_channel_amount_daily values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s," \
+           "%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
     db.dm.execute("truncate table dw_channel_amount_daily")
-    db.dm.executeMany(isql,data)
+    db.dm.executeMany(isql, data)
 
 
 def dw_channel_amount_daily_reverse():
@@ -215,21 +216,37 @@ def del_channel():
     db.dm.execute(sql)
 
 
-
 def dw_channel_daily():
-    print('run> dw_channel')
-    dw_channel()   # 公众号基本数据
+    logging.info('公众号数据处理,开始')
+    logging.info('run> dw_channel')
+    logging.info('公众号基本数据处理,开始')
+    dw_channel()  # 公众号基本数据
+    logging.info('公众号基本数据处理,开始')
+
+    logging.info('用户激活数据处理,开始')
     dw_channel_user_daily()  # 用户激活
+    logging.info('用户激活数据处理,结束')
+
+    logging.info('用户充值数据处理,开始')
     dw_channel_amount_daily()  # 用户充值
+    logging.info('用户充值数据处理,结束')
+
+    logging.info('用户充值数据倒序处理,开始')
     dw_channel_amount_daily_reverse()  # 用户充值倒序
+    logging.info('用户充值数据倒序处理,结束')
+
+    logging.info('删除代投,开始')
     del_channel()  # 删除代投的号
+    logging.info('删除代投,结束')
 
-if __name__ == '__main__':
+    logging.info('公众号数据处理,开始')
 
+
+if __name__ == '__main__':
     # dw_daily_channel()
     # dw_channel()
     # dw_channel_amount_daily()
     # dw_channel_user_daily()
     # dw_channel_amount_daily_reverse()
     dw_channel()
-    # del_channel()
+    # del_channel()

+ 1 - 1
app/etl/dw/dw_image_cost_day.py

@@ -102,7 +102,7 @@ def hourly():
         run(du.getNow())
         logging.info('广告数据清洗,结束')
     except:
-        DingTalkUtils.send("广告数据清洗失败")
+        DingTalkUtils().send("广告数据清洗失败")
 
 def day():
     logging.info('广告数据清洗,开始')

+ 8 - 7
app/etl/dw/dw_pitcher_daily.py

@@ -2,17 +2,19 @@
 @desc 投手维度全量表
 @auth ck
 """
-import time
 from model.DateUtils import DateUtils
-from model.DataBaseUtils import MysqlUtils,CkUtils
+from model.DataBaseUtils import MysqlUtils, CkUtils
+import logging
+
 du = DateUtils()
-db= MysqlUtils()
+db = MysqlUtils()
 ck = CkUtils()
 
 
 def dw_pitcher_trend():
-    print('run> dw_pitcher_trend')
-    sql="""insert into dw_pitcher_trend 
+    logging.info('投手趋势数据处理,开始')
+    logging.info('run> dw_pitcher_trend')
+    sql = """insert into dw_pitcher_trend 
     select a.dt,pitcher,
        sum(cost),
        sum(order_amount),
@@ -39,9 +41,8 @@ group by pitcher,a.dt"""
 
     db.dm.execute("truncate table dw_pitcher_trend")
     db.dm.execute(sql)
+    logging.info('投手趋势数据处理,结束')
 
 
 if __name__ == '__main__':
     dw_pitcher_trend()
-
-

+ 22 - 20
app/etl/src/src_book_info.py

@@ -1,43 +1,45 @@
-
-import time
 from model.DingTalkUtils import DingTalkUtils
 from model.DateUtils import DateUtils
-from model.DataBaseUtils import MysqlUtils,CkUtils
+from model.DataBaseUtils import MysqlUtils, CkUtils
+import logging
+
 du = DateUtils()
-db= MysqlUtils()
+db = MysqlUtils()
 ck = CkUtils()
 
+
 def src_book_info():
-    print("run> src_book_info")
-    sql="""SELECT  date_format(a.effect_day,'%Y-%m-%d'),
+    logging.info('书籍卡点信息,开始')
+    logging.info("run> src_book_info")
+    sql = """SELECT  date_format(a.effect_day,'%Y-%m-%d'),
             case a.type when 'mp' then 'MP' when 'gdt' then 'GDT' end type,
             b.book_name,c.platform_name,a.node_price from t_platform_book_ratio a 
             left join  t_platform_book b on a.platform_book_id=b.id 
             left join t_platform_novel c on b.platform_id=c.id  order by a.effect_day"""
 
-    data =db.zx.getData(sql)
+    data = db.zx.getData(sql)
 
-    di={}
+    di = {}
     for i in data:
         thedate = i[0]
         while True:
-            if thedate>du.get_n_days(0):
+            if thedate > du.get_n_days(0):
                 break
             else:
-                di[thedate+','+i[1]+','+i[2]+','+i[3]]=i[4]
-                thedate=du.add_days(thedate,1)
-    li=[]
-    for i,j in di.items():
-        li.append(i.split(',')+[j])
+                di[thedate + ',' + i[1] + ',' + i[2] + ',' + i[3]] = i[4]
+                thedate = du.add_days(thedate, 1)
+    li = []
+    for i, j in di.items():
+        li.append(i.split(',') + [j])
 
     db.dm.execute("truncate table src_book_info")
-    db.dm.executeMany("insert into src_book_info(dt,type,book,platform,node) values (%s,%s,%s,%s,%s)",li)
-    print('src_book_info success')
+    db.dm.executeMany("insert into src_book_info(dt,type,book,platform,node) values (%s,%s,%s,%s,%s)", li)
+    logging.info('src_book_info success')
+    logging.info('书籍卡点信息,结束')
+
+
 if __name__ == '__main__':
     try:
         src_book_info()
     except Exception as e:
-        DingTalkUtils.send(str(e))
-        
-
-
+        DingTalkUtils().send(str(e))

+ 0 - 1
app/etl/sync_to_ck_task.py

@@ -77,4 +77,3 @@ if __name__ == '__main__':
     # for i in dt.getDateLists('2019-03-18','2020-12-17'):
     # #     order(i)
     #     dw_order_channel_sync_ck(i)
-    # print(ck.getColumns(f"dw_daily_channel_cost"))

+ 2 - 2
app/moniter/dd_daily.py

@@ -19,7 +19,7 @@ def cost_data(dt=du.get_n_days(-1)):
     b = db.quchen_text.getOne(sql2)
     c = db.quchen_text.getOne(sql3)
     d = db.quchen_text.getOne(sql4)
-    DingTalkUtils.send("[{}]\norder: {}\ndaily_qq: {}\ndaily_vx: {}\ndaily_vx_campaign: {}".format(dt,a,b,c,d))
+    DingTalkUtils().send("[{}]\norder: {}\ndaily_qq: {}\ndaily_vx: {}\ndaily_vx_campaign: {}".format(dt,a,b,c,d))
 
 
 def order_data():
@@ -31,7 +31,7 @@ def order_data():
         info += f"{i[0]}: {i[1]}\n"
 
     # print(info)
-    DingTalkUtils.send(info)
+    DingTalkUtils().send(info)
 
 
 

+ 9 - 1
example/update_order_data.py

@@ -67,5 +67,13 @@ def update_order():
     book_trend()
     dm_pitcher_daily_overview()
 
+
+def update_order_do():
+    st = du.get_n_days(-10)
+    et = du.get_n_days(0)
+    print(st, et)
+    do_order(st, et)
+
 if __name__=='__main__':
-    update_order()
+    # yangguang.get_channel_info()
+    update_order_do()

+ 0 - 1
model/DataBaseUtils.py

@@ -137,7 +137,6 @@ class CkUtils:
         df = pd.DataFrame.from_records(data, columns=columns)
         return df
 
-        # print(df.to_json(orient='records'))
 
     def getData_json(self,sql):
         return self.getData_pdv2(sql).to_json(orient='records')

+ 2 - 2
model/DingTalkUtils.py

@@ -18,7 +18,7 @@ class DingTalkUtils:
             可多个,按英文逗号隔开
             可填 【ramdom】 随机发送
     """
-    def send(msg, phone=""):
+    def send(self,msg, phone=""):
         if phone == "":
             isAtall = True
             atMobiles = []
@@ -47,7 +47,7 @@ def DingTalkDecorators(msg):
             try:
                 func(*args, **kwargs)
             except Exception as e:
-                DingTalkUtils.send(msg+"->运行出错: "+str(e))
+                DingTalkUtils().send(msg+"->运行出错: "+str(e))
 
         return deco
     return wrapper