ck 4 anni fa
parent
commit
aa1d7d4ba9

+ 13 - 13
app/etl/data_stat_daily.py

@@ -6,7 +6,7 @@ from model.DingTalkUtils import DingTalkUtils
 from app.etl.dm.dm_pitcher_daily_overview import dm_pitcher_daily_overview
 yestoday = du.get_n_days(-1)
 today = du.get_n_days(0)
-
+"""凌晨5点跑昨天的数据"""
 
 
 def run(ymd):
@@ -29,7 +29,7 @@ def run(ymd):
 
 def run_order_history():
     """当order表跑了历史的数据,要从这历史数据的最早时间跑此任务流"""
-    for i in du.getDateLists('2020-11-01',du.get_n_days(0)):
+    for i in du.getDateLists('2020-01-01',du.get_n_days(0)):
         print(i)
         ods_order(i)
         order_sync_ck(i)
@@ -40,25 +40,25 @@ def run_order_history():
 
 def run_cost_history():
     "cost 重跑后,跑此任务流"
-    for i in du.getDateLists('2020-11-01',du.get_n_days(0)):
+    for i in du.getDateLists('2019-03-18',du.get_n_days(0)):
         print(i)
+        channel_by_account_daily(i)
+        channel_info_daily(i)
+
         dw_daily_channel_cost(i)
         dw_order_channel_cost_sync_ck(i)
         dw_channel_daily_total(i)
     dw_daily_channel()
 
-
-
-def main():
-    run(yestoday)
-    dw_daily_channel()
-    dm_pitcher_daily_overview()
-
-
+import sys
 if __name__ == '__main__':
-   # main()
+    if len(sys.argv)==1:
+        run(yestoday)
+    else:
+
+        for i in du.getDateLists(sys.argv[1],sys.argv[2]):
+            run(i)
 
-    run_order_history()
 
 
 

+ 3 - 11
app/etl/dm/dm_pitcher_daily_overview.py

@@ -10,11 +10,8 @@ ck = CkUtils()
 
 def dm_pitcher_daily_overview():
     sql=f"""
-select '{du.get_n_days(0)}' dt,
-       q.pitcher,
-       total_cost,
-       total_amount,
-       if(total_cost=0,0,total_amount/total_cost) roi,
+select 
+        q.pitcher,
        channel_count,
        on_channel_count,
        channel_count-on_channel_count off_channel_count,
@@ -80,15 +77,10 @@ left outer join (
     where reg_time >= '{du.get_n_pre_month_first_day(0)} 00:00:00' 
     group by pitcher
 ) g on q.pitcher=g.pitcher
-    left outer join(
-select pitcher,total_amount,total_cost from dw_daily_pitcher where dt='{du.get_n_days(0)}'
-) k on  q.pitcher=k.pitcher
-    
-
 """
     data=ck.execute(sql)
 # print(data)
-    db.quchen_text.executeMany("replace into dm_pitcher_daily_overview values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",data)
+    db.quchen_text.executeMany("replace into dm_pitcher_daily_overview values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",data)
 
 if __name__ == '__main__':
     dm_pitcher_daily_overview()

+ 73 - 0
app/etl/dw/dw_channel_daily.py

@@ -0,0 +1,73 @@
+"""
+@desc 号维度全量表 ck上跑完dw_daily_channel 并同步到mysql dw_channel_daily
+@auth ck
+"""
+import time
+from model.DateUtils import DateUtils
+from model.DataBaseUtils import MysqlUtils,CkUtils
+du = DateUtils()
+db= MysqlUtils()
+ck = CkUtils()
+
+
+def dw_daily_channel():
+
+    sql="""insert into dw_daily_channel
+select
+       dt,channel,pitcher,stage,platform,book,
+       order_count,order_user,order_amount,first_order_count,first_order_user,first_order_amount,
+       view_count,click_count,follow_user,cost,reg_order_count,reg_order_user,
+       reg_order_amount,reg_order_amount30,web_view_count,platform_view_count,web_order_count,total_cost,
+       total_amount,reg_order_user_again,reg_order_user7,reg_order_user30,reg_order_amount7,type,
+       total_first_amount 
+from
+(select dt,channel, pitcher,stage,platform,book,cost,view_count,click_count,follow_user,web_view_count,platform_view_count,web_order_count,type from dw_daily_channel_cost) a
+    left outer join
+    (select toDate(formatDateTime(reg_time,'%Y-%m-%d')) as dt2,channel as channel2,
+       sum(amount) as reg_order_amount,
+       count(distinct user_id) as reg_order_user,
+       count(1) as reg_order_count,
+       sum(if(subtractDays(date, 30)>reg_time,0,amount)) as reg_order_amount30,
+       count(distinct if(subtractDays(date, 7)>reg_time,'',user_id))-1 reg_order_user7,
+       sum(if(subtractDays(date, 7)>reg_time,0,amount)) as reg_order_amount7,
+       count(distinct if(subtractDays(date, 30)>reg_time,'',user_id))-1 reg_order_user30
+       from order where reg_time>'2019-03-18 00:00:00' group by toDate(formatDateTime(reg_time,'%Y-%m-%d')),channel) b
+        on dt=dt2 and channel=channel2 left outer join
+     (select date as dt3,channel as channel3,
+     count(1) as first_order_count,
+        count(distinct user_id) as first_order_user,
+             sum(amount) as first_order_amount
+       from order where toDate(reg_time)=date  group by date,channel) c
+        on dt=dt3 and channel=channel3
+    left outer join
+    (select date as dt4,channel as channel4,
+     count(1) as order_count,
+        count(distinct user_id) as order_user,
+             sum(amount) as order_amount
+       from order  group by date,channel) d
+        on dt=dt4 and channel=channel4
+left outer join
+     (select dt dt5,channel channel5,total_cost,total_amount,total_first_amount from dw_channel_daily_total ) e
+         on dt=dt5 and channel=channel5
+
+left outer join (
+select sum(if(user_order_count>1,1,0)) reg_order_user_again,channel channel6,toDate(reg_date) dt6  from (
+select formatDateTime(reg_time,'%Y-%m-%d') reg_date,channel,count(1) user_order_count
+    from order group by formatDateTime(reg_time,'%Y-%m-%d') ,user_id,channel
+    ) x group by reg_date,channel
+    ) f on dt=dt6 and channel=channel6
+        """
+    ck.execute("truncate table dw_daily_channel")
+    ck.execute(sql)
+    print("ok")
+
+    data=ck.execute('select * from dw_daily_channel')
+    insert_sql="replace into dw_channel_daily values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
+    st=time.time()
+    db.quchen_text.executeMany(insert_sql,data)
+    print(time.time()-st)
+
+
+if __name__ == '__main__':
+
+    dw_daily_channel()

+ 45 - 0
app/etl/dw/dw_pitcher_daily.py

@@ -0,0 +1,45 @@
+"""
+@desc 投手维度全量表
+@auth ck
+"""
+import time
+from model.DateUtils import DateUtils
+from model.DataBaseUtils import MysqlUtils,CkUtils
+du = DateUtils()
+db= MysqlUtils()
+ck = CkUtils()
+
+
+def dw_pitcher_daily():
+    sql="""
+    select dt,pitcher,sum(order_amount),sum(cost),sum(reg_order_amount),sum(first_order_amount),0,0
+    from dw_daily_channel group by dt,pitcher
+    """
+    data = ck.execute(sql)
+    # db.quchen_text.execute('truncate table dw_pitcher_daily')
+    db.quchen_text.executeMany('replace into dw_pitcher_daily values (%s,%s,%s,%s,%s,%s,%s,%s)',data)
+
+    add_total()
+
+
+def add_total():
+    sql='select pitcher from dw_pitcher_daily group by pitcher'
+    pitchers= [i[0] for i in db.quchen_text.getData(sql)]
+
+    sql2="""replace into dw_pitcher_daily
+    SELECT x.dt,x.pitcher,amount,cost,reg_amount,first_reg_amount,x.total_cost,x.total_amount FROM
+    
+    (SELECT dt,pitcher,@total_sum:=@total_sum+reg_amount total_amount,@total_cost:=@total_cost+cost total_cost from 
+        (SELECT dt,pitcher,reg_amount,cost FROM dw_pitcher_daily where pitcher='{}' order by dt) b,(SELECT @total_sum:=0,@total_cost:=0) a 
+    ) x 
+    left join dw_pitcher_daily y on x.dt=y.dt and x.pitcher=y.pitcher"""
+
+    for i in pitchers:
+        db.quchen_text.execute(sql2.format(i))
+
+    db.quchen_text.execute('delete from dw_pitcher_daily where total_cost=0')
+
+
+if __name__ == '__main__':
+    dw_pitcher_daily()
+