ck há 4 anos atrás
pai
commit
c25dd66a12

+ 29 - 12
app/etl/data_stat_daily.py

@@ -1,25 +1,42 @@
 from model.DateUtils import DateUtils
 from data_stat_task import *
-
+from sync_to_ck_task import *
 du = DateUtils()
-
+from model.DingTalkUtils import DingTalkUtils
 yestoday = du.get_n_days(-1)
 today = du.get_n_days(0)
 
-def _channel_by_account_daily():
-    channel_by_account_daily(today)
-    channel_by_account_daily(yestoday)
 
 
-def _channel_info_daily():
-    channel_info_daily(today)
-    channel_info_daily(yestoday)
+def run(ymd):
+    try:
+        # 公众号元数据
+        channel_by_account_daily(ymd)
+        channel_info_daily(ymd)
+
+        # 清洗订单表-同步到ck
+        ods_order(ymd)
+        order_sync_ck(ymd)
+
+        # 公众号维度数据-同步到ck
+        dw_daily_channel(ymd)
+        dw_order_channel_sync_ck(ymd)
+    except Exception as e:
+        DingTalkUtils.send("数据ETL出错:",'18860455786')
+
+
 
 if __name__ == '__main__':
-    ods_order(yestoday)
-    _channel_by_account_daily()
-    _channel_info_daily()
-    dw_daily_channel(yestoday)
+    run(yestoday)
+
+
+
+
+
+
+
+
+
 
 
 

+ 3 - 1
app/etl/data_stat_hourly.py

@@ -2,13 +2,15 @@ from model.DateUtils import DateUtils
 from model.log import logger
 from model.DingTalkUtils import DingTalkUtils
 from data_stat_task import *
+from sync_to_ck_task import *
 log=logger()
 du = DateUtils()
 yestoday = du.get_n_days(-1)
 today = du.get_n_days(0)
 
 if __name__ == '__main__':
+    pass
+    # 清洗订单表
     ods_order(today)
-    dw_daily_channel(today)
 
 

+ 15 - 37
app/etl/data_stat_task.py

@@ -6,12 +6,12 @@ db = MysqlUtils()
 ck = CkUtils()
 dt = DateUtils()
 from datetime import datetime
-
+from sync_to_ck_task import dw_order_channel_sync_ck
 
 def dw_daily_channel(ymd):
     sql="""replace into dw_daily_channel
-        select dt,x.channel,pitcher,stage,platform,book,ifnull(order_count,0),ifnull(order_user,0),ifnull(order_amount,0),ifnull(first_order_count,0),ifnull(first_order_user,0),
-               ifnull(first_order_amount,0),ifnull(view_count,0),ifnull(click_count,0),ifnull(follow_user,0),ifnull(cost,0) from
+        select dt,x.channel,pitcher,stage,platform,book,ifnull(order_count,0) order_count,ifnull(order_user,0),ifnull(order_amount,0),ifnull(first_order_count,0),ifnull(first_order_user,0),
+               ifnull(first_order_amount,0),ifnull(view_count,0),ifnull(click_count,0),ifnull(follow_user,0),ifnull(cost,0) as cost   from
         (select dt,channel,stage,pitcher,platform,book from channel_info_daily where dt='{0}' and channel!='') x
         left join
         (select  channel,count(1) as order_count,count(distinct user_id) as order_user,sum(amount) as order_amount,
@@ -26,7 +26,9 @@ def dw_daily_channel(ymd):
         union
         select account_id,cost,view_count,valid_click_count,from_follow_uv from daily_qq where date='{0} 00:00:00') a
         left join
-        (select account_id,channel from channel_by_account_daily where dt='{0}') b on a.account_id=b.account_id group by channel) z on x.channel=z.channel""".format(ymd)
+        (select account_id,channel from channel_by_account_daily where dt='{0}') b on a.account_id=b.account_id group by channel) z on x.channel=z.channel 
+        """.format(ymd)
+
     db.quchen_text.execute(sql)
 
 
@@ -102,9 +104,6 @@ def channel_info_daily(ymd):
     db.quchen_text.executeMany(insert_sql,data)
 
 
-
-
-
 def ods_order(dt):
     sql="""	replace into ods_order
 			select 
@@ -131,43 +130,22 @@ def order_account_text():
             db.quchen_text.execute("insert into order_account_text(platform,text) values ('文鼎','{}')".format(i))
 
 
-def update_dw_daily_channel_all(ymd):
-    """把某天的数据都捞出来 按照注册时间 更新前面日期的数据"""
-    # sql="""select DATE_FORMAT(reg_time,'%Y-%m-%d') as dt,count(1) as  reg_user_order_count,
-    #         count(DISTINCT user_id) as reg_user,
-    #         sum(amount) as reg_user_order_amount,
-    #         sum(if(DATE_ADD(reg_time,INTERVAL 30 day)<STR_TO_DATE('2020-12-16','%Y-%m-%d '),0,amount))
-    #         from ods_order where date='2020-12-16' GROUP BY DATE_FORMAT(reg_time,'%Y-%m-%d')
-    #         """
-    """"""
-    sql="select count(1) as reg_user_order_count," \
-        "       count(DISTINCT user_id) reg_user," \
-        "       sum(amount) as reg_user_order_amount," \
-        "       sum(if(DATE_SUB(date ,INTERVAL 30 day)>STR_TO_DATE('{0}','%Y-%m-%d '),0,amount)) as reg_user_order_amount30" \
-        "        from ods_order where DATE_FORMAT(reg_time,'%Y-%m-%d')='{0}'".format(ymd)
-    data=db.quchen_text.getData(sql)
-    print(data)
-    exit(0)
-
-    update_sql="update dw_daily_channel set reg_user_order_count={},reg_user={},reg_user_order_amount={},reg_user_order_amount30={} " \
-               " where  dt='{}'".format()
-
 
 
 
 
 
 
+if __name__ == '__main__':
 
+    ods_order('2019-03-18')
 
+    for i in dt.getDateLists('2019-03-18','2020-12-17'):
+        print(i)
+        # ods_order(i)
+        # channel_by_account_daily(i)
+        # channel_info_daily(i)
+        dw_daily_channel(i)
+        dw_order_channel_sync_ck(i)
 
-if __name__ == '__main__':
-    # dw_daily_channel('2020-12-14')
-    # channel_info_daily('2020-12-07')
-    for i in dt.getDateLists('2019-03-18','2020-12-16'):
-    #     print(i)
-    #     # ods_order(i)
-    #     # channel_by_account_daily(i)
-          channel_info_daily(i)
-    #     dw_daily_channel(i)
 

+ 0 - 39
app/etl/sync_to_ck_daily.py

@@ -1,39 +0,0 @@
-"""
-数据同步到ck
-天调度
-"""
-from model.DataBaseUtils import MysqlUtils,CkUtils
-from model.DateUtils import DateUtils
-from model.log import logger
-log=logger()
-db = MysqlUtils()
-ck = CkUtils()
-dt = DateUtils()
-
-from sync_to_ck_task import *
-yestoday=dt.get_n_days(-1)
-today=dt.get_n_days()
-
-def run(ymd):
-    # 全量
-    # pitcher_change()
-    # book_change()
-    # account_change()
-    # platform_change()
-
-    # 天
-    # daily_vx_campaign(ymd)
-    # adcreative()
-
-    # 小时
-    # order(ymd)
-    # daily_vx(ymd)
-    # daily_qq(ymd)
-    pass
-
-
-
-
-
-if __name__ == '__main__':
-    order(yestoday)

+ 0 - 34
app/etl/sync_to_ck_hourly.py

@@ -1,34 +0,0 @@
-"""同步到 ck 小时调度"""
-
-from model.DataBaseUtils import MysqlUtils,CkUtils
-from model.DateUtils import DateUtils
-from model.log import logger
-from model.DingTalkUtils import DingTalkUtils
-from sync_to_ck_task import *
-log=logger()
-db = MysqlUtils()
-ck = CkUtils()
-dt = DateUtils()
-
-today=dt.get_n_days()
-# def run():
-    # advertiser_vx_qq()
-    #
-    #
-    #
-    # try:
-    #     order(ymd)
-    # except:
-    #     DingTalkUtils.send("[order]同步到ck出错:",'18860455786')
-    # try:
-    #     daily_qq(ymd)
-    # except:
-    #     DingTalkUtils.send("[daily_qq]到ck出错:", '18860455786,')
-    # try:
-    #     daily_vx(ymd)
-    # except:
-    #     DingTalkUtils.send("[daily_vx]同步到ck出错", '18860455786,')
-
-
-if __name__ == '__main__':
-    order(today)

+ 24 - 102
app/etl/sync_to_ck_task.py

@@ -6,25 +6,22 @@ log=logger()
 db = MysqlUtils()
 ck = CkUtils()
 dt = DateUtils()
-from datetime import datetime
 
 
-def order(ymd):
+
+def order_sync_ck(ymd):
     log.debug("sync order")
     col = "date,stage,platform,channel,channel_id,user_id,order_time,reg_time,amount,from_novel,order_id"
-
     sql ="select * from ods_order where date='{}'".format(ymd)
-
     data = db.quchen_text.getData(sql)
-
-    # # 掌中云时间格式转换
     data1 = []
     for x in data:
         li = list(x)
 
         li[0]=str(li[0])
+        li[5]='' if li[5]==None else li[5]
         li[6]=str(li[6])
-        li[7]=str(li[7])
+        li[7]='0000-00-00 00:00:00' if li[7]==None else str(li[7])
         li[9]='' if li[9]==None else li[9]
 
         data1.append(tuple(li))
@@ -34,47 +31,6 @@ def order(ymd):
     ck.insertMany("order", col, tuple(data1))
 
 
-def daily_qq(mydt):
-    log.debug("sync daily_qq")
-    table='daily_qq'
-    col=db.quchen_text.getColumn("daily_qq")
-    sql = "select * from {} where date='{} 00:00:00'".format(table,mydt)
-    data = db.quchen_text.getData(sql)
-    li1 = []
-
-    for i in data:
-        li2 = list(i)
-        li2[1] = str(li2[1])
-        li2[12]=round(i[12],4)
-        li1.append(tuple(li2))
-    ck.execute("alter table {} drop  partition '{}' ".format(table, mydt))
-    ck.insertMany(table, col, li1)
-
-
-def daily_vx(mydt):
-    log.debug("sync daily_vx")
-    table='daily_vx'
-    col=db.quchen_text.getColumn(table)
-    sql = "select * from {} where date='{} 00:00:00'".format(table,mydt)
-    data = db.quchen_text.getData(sql)
-    # print(len(data))
-    li1 = []
-
-
-    for i in data:
-        li2 = list(i)
-        li2[1] = str(li2[1])
-        li2[5] = round(i[5],4)
-        li2[6] = round(i[6],4)
-        li2[7] = round(i[7],4)
-        li2[8] = round(i[8],4)
-        li2[10] = round(i[10],4)
-        li1.append(tuple(li2))
-
-    ck.execute("alter table {} drop  partition '{}' ".format(table, mydt))
-    ck.insertMany(table, col, li1)
-
-
 # 广告计划
 def daily_vx_campaign(ymd):
     table = 'daily_vx_campaign'
@@ -97,58 +53,6 @@ def daily_vx_campaign(ymd):
     ck.insertMany(table, col, tuple(data1))
 
 
-def advertiser_vx_qq():
-    sql = "select account_id,name,stage,pitcher,platform,book,'vx' as tag from advertiser_vx " \
-          " union " \
-          "select account_id,name,stage,pitcher,platform,book,'qq' as tag from advertiser_qq"
-    data = db.quchen_text.getData(sql)
-    data1 = []
-
-    for i in data:
-        li = list(i)
-        for idx, val in enumerate(li):
-            if val == None:
-                li[idx] = ''
-        data1.append(tuple(li))
-
-    ck.execute("truncate table advertiser_vx_qq")
-    ck.execute("insert into advertiser_vx_qq  (account_id,name,stage,pitcher,platform,book,tag) values " + str(data1)[1:-1])
-
-
-def pitcher_change():
-    sql="select channel,pitcher,ifnull(start_time,'') as start_time," \
-        " ifnull(end_time,'') as end_time  from pitcher_change"
-    data=db.quchen_text.getData(sql)
-    ck.execute("truncate table pitcher")
-    ck.execute("insert into pitcher_change values {}".format(str(data)[1:-1]))
-
-
-def book_change():
-    table="book_change"
-    col="name,book,start_time,end_time"
-    sql="select name,book,ifnull(start_time,'') as start_time,ifnull(end_time,'') as end_time from {}".format(table)
-    data=db.quchen_text.getData(sql)
-    ck.execute("truncate table {}".format(table))
-    ck.insertMany(table,col,data)
-
-def account_change():
-    table = "account_change"
-    col = "account_id,name,pitcher,start_time,end_time"
-    sql = "select account_id,name,pitcher,ifnull(start_time,'') as start_time,ifnull(end_time,'') as end_time from {}".format(table)
-    data = db.quchen_text.getData(sql)
-    ck.execute("truncate table {}".format(table))
-    ck.insertMany(table, col, data)
-
-
-def platform_change():
-    table = "platform_change"
-    col = "name,primary_platform,current_platform,change_date"
-    sql = "select name,primary_platform,current_platform,date_format(change_date,'%Y-%m-%d') as change_date from {}".format(table)
-    data = db.quchen_text.getData(sql)
-    ck.execute("truncate table {}".format(table))
-    ck.insertMany(table, col, data)
-
-
 def adcreative():
     """todo:表需要重新设计 无法分区"""
     pass
@@ -161,7 +65,25 @@ def campaign_vx():
     # data=db.quchen_text.getData(sql)
     pass
 
+def dw_order_channel_sync_ck(ymd):
+    table='dw_daily_channel'
+    sql = f"select * from {table} where dt='{ymd}'"
+    col=ck.getColumns(f"{table}",str=True)
+    data = db.quchen_text.get_data_list(sql)
+    data1=[]
+    for i in data:
+        i[0]=str(i[0])
+        i[8]=str(i[8])
+        i[11]=str(i[11])
+        i[15]=str(i[15])
+        data1.append(tuple(i))
+    ck.execute(f"alter table {table} drop  partition '{ymd}' ")
+    ck.insertMany(f"{table}", col, tuple(data1))
+
+
+
 
 if __name__ == '__main__':
-    for i in dt.getDateLists('2019-05-20','2020-12-27'):
-        order(i)
+    for i in dt.getDateLists('2019-03-18','2020-12-17'):
+    #     order(i)
+        dw_order_channel_sync_ck(i)

+ 35 - 3
model/DataBaseUtils.py

@@ -7,6 +7,7 @@ from model.DataBaseOperation import MysqlOperation
 from model.log import logger
 import yaml
 import os
+import pandas as pd
 from clickhouse_driver.client import Client
 log = logger()
 
@@ -20,6 +21,10 @@ class MysqlUtils:
         f = open(path, encoding="utf-8")
         self.config = yaml.load(f.read(), Loader=yaml.FullLoader)
 
+
+
+
+
     @property
     def quchen_text(self):
 
@@ -60,6 +65,30 @@ class CkUtils:
 
         return self.client.execute(sql)
 
+    def getData_pd(self,sql,col):
+        data=self.execute(sql)
+        df=pd.DataFrame(data,columns=col)
+        return df
+
+
+
+
+    def getColumns(self,table,str=False):
+        data=self.execute("desc "+table)
+        li=[]
+        str = ''
+        for i in data:
+            li.append(i[0])
+            str+=i[0]+','
+        if str:
+            return str[:-1]
+        else:
+            return li
+
+
+
+
+
 
     def insertMany(self,table,col,data):
         """
@@ -68,7 +97,7 @@ class CkUtils:
         :param data:  tuple/list
         :return:
         """
-        max=200
+        max=1000
         sql="insert into {} ({}) values ".format(table,col)
 
         if len(data) == 0:
@@ -90,5 +119,8 @@ class CkUtils:
 
 
 if __name__ == '__main__':
-    p_path = os.path.dirname(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)))
-    print(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)))
+    # p_path = os.path.dirname(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)))
+    # print(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)))
+    ck=CkUtils()
+    print(ck.execute("desc order"))
+