ck 4 jaren geleden
bovenliggende
commit
25bc728d4b
4 gewijzigde bestanden met toevoegingen van 53 en 55 verwijderingen
  1. 5 8
      app/etl/data_stat_run.py
  2. 32 20
      app/etl/data_stat_task.py
  3. 3 18
      app/etl/sync_to_ck_task.py
  4. 13 9
      model/DataBaseOperation.py

+ 5 - 8
app/etl/data_stat_run.py

@@ -1,5 +1,5 @@
 from model.DateUtils import DateUtils
-from model.log import logger
+import logging
 from model.DingTalkUtils import DingTalkUtils
 from data_stat_task import *
 from sync_to_ck_task import *
@@ -7,10 +7,10 @@ from app.etl.dm.dm_pitcher_daily_overview import dm_pitcher_daily_overview
 from app.etl.dw.dw_channel_daily import dw_daily_channel,dw_channel
 from app.etl.dw.dw_pitcher_daily import dw_pitcher_trend
 from app.etl.src.src_book_info import src_book_info
-log=logger()
 du = DateUtils()
+logger = logging.getLogger("")
+logger.setLevel(logging.ERROR)
 
-import sys
 
 def do_order(st,et):
     for i in du.getDateLists(st,et):
@@ -23,8 +23,6 @@ def do_cost(st,et):
         channel_by_account_daily(i)
         channel_info_daily(i)
         dw_daily_channel_cost(i)
-        dw_order_channel_cost_sync_ck(i)
-
 
 def main(st,et):
     try:
@@ -53,9 +51,8 @@ def daily():
     do_cost(st, et)
 
 
-
 if __name__ == '__main__':
-    hourly()
-    # do_cost('2019-09-28','2021-01-01')
+    # hourly()
+    do_cost('2020-01-01','2021-01-01')
 
     # do_order('2020-10-29','2021-02-03')

+ 32 - 20
app/etl/data_stat_task.py

@@ -6,18 +6,18 @@ db = MysqlUtils()
 ck = CkUtils()
 dt = DateUtils()
 from datetime import datetime
-from sync_to_ck_task import dw_order_channel_cost_sync_ck
-
 
 
 def dw_daily_channel_cost(ymd):
-    sql="""replace into dw_daily_channel_cost
+    print("run> dw_daily_channel_cost")
+    sql="""
         select x.dt,x.channel,pitcher,stage,x.platform,x.book,
-               ifnull(view_count,0),ifnull(click_count,0),ifnull(follow_user,0),ifnull(cost,0)/100 as cost,
+               ifnull(view_count,0),ifnull(click_count,0),
+               ifnull(follow_user,0),ifnull(cost,0)/100 as cost,
                ifnull(web_view_count,0) web_view_count,
                ifnull(platform_view_count,0) platform_view_count,
                ifnull(web_order_count,0) web_order_count,
-               ifnull(type,''),ifnull(require_roi,0),ifnull(require_mult,0)
+               ifnull(type,''),0 require_roi,0 require_mult
                  from
         (select dt,channel,stage,pitcher,platform,book from channel_info_daily where dt='{0}' and channel!='') x  
         left join
@@ -39,18 +39,30 @@ def dw_daily_channel_cost(ymd):
         (select account_id,channel from channel_by_account_daily where dt='{0}') b on a.account_id=b.account_id  group by channel) 
         z on x.channel=z.channel 
         left join (SELECT channel,type from channel_by_account_daily GROUP By channel,type) k on x.channel=k.channel
-        left join (
-        select dt,book,platform,require_roi,require_mult from ods_book_info_daily 
-        )m on x.book=m.book and x.platform=m.platform and x.dt=m.dt
+        
 
         """.format(ymd)
-    print(sql)
-
-    db.quchen_text.execute(sql)
-
+    data=db.quchen_text.get_data_list(sql)
+    data1 = []
+    col="dt,channel,pitcher,stage,platform,book,view_count,click_count,follow_user,cost,web_view_count,platform_view_count,web_order_count,type,require_roi,require_mult"
+    for i in data:
+        i[0] = str(i[0])
+        i[9] = str(i[9])
+        i[6]=float(i[6])
+        i[7]=float(i[7])
+        i[8]=float(i[8])
+        i[9]=float(i[9])
+        i[10]=float(i[10])
+        i[11]=float(i[11])
+        i[12]=float(i[12])
+        data1.append(tuple(i))
+    ck.execute(f"alter table dw_daily_channel_cost drop  partition '{ymd}' ")
+    print(len(data1))
+    ck.insertMany("dw_daily_channel_cost", col, tuple(data1))
 
 def channel_by_account_daily(ymd):
     """返回当天消耗账户对应的公众号表"""
+    print("run> channel_by_account_daily")
     sql="""replace into channel_by_account_daily 
             select  '{0}' as dt,a.account_id as account_id, ifnull(ifnull(b.name,a.name),'') as channel,type  from
             (select  account_id,name,'qq' as type from advertiser_qq 
@@ -61,14 +73,18 @@ def channel_by_account_daily(ymd):
             (select b.account_id,b.name from
             (select min(end_time) as end_time,account_id from account_change where end_time>'{0}' GROUP BY account_id) a
             left join account_change  b on  a.end_time=b.end_time and a.account_id=b.account_id) b on a.account_id=b.account_id""".format(ymd)
-    print(sql)
+    # print(sql)
     db.quchen_text.execute(sql)
 
+
+
 def channel_info_daily(ymd):
     """获取公众号某天的期数,投手,平台,书籍
     @ return [[]]
     """
     # 获取现在的全量公众号信息
+    print("run> channel_info_daily")
+
     sql="""select '{}' as dt,a.name ,ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from (
             select  name from advertiser_vx  where name is not null group by name--  公众号全量表
             union
@@ -87,9 +103,6 @@ def channel_info_daily(ymd):
                 ) b on a.name=b.name
                 """.format(ymd)
     data=db.quchen_text.get_data_list(sql)
-
-
-
     pitcher_change=db.quchen_text.getData(
                                           "select b.channel as channel,pitcher from "
                                           "(select max(start_time) as start_time,channel from pitcher_change "
@@ -114,9 +127,7 @@ def channel_info_daily(ymd):
         for h in book_change:
             if i[1]==h[0]:
                 i[5]=h[1]
-
-
-    print(data)
+    # print(data)
     insert_sql="replace into channel_info_daily values (%s,%s,%s,%s,%s,%s) "
 
     db.quchen_text.executeMany(insert_sql,data)
@@ -147,5 +158,6 @@ def order_account_text():
         for i in f.readlines():
             db.quchen_text.execute("insert into order_account_text(platform,text) values ('文鼎','{}')".format(i))
 
-
+if __name__ == '__main__':
+    dw_daily_channel_cost('2019-10-23')
 

+ 3 - 18
app/etl/sync_to_ck_task.py

@@ -68,26 +68,11 @@ def campaign_vx():
     # data=db.quchen_text.getData(sql)
     pass
 
-def dw_order_channel_cost_sync_ck(ymd):
-    table='dw_daily_channel_cost'
-    sql = f"select * from {table} where dt='{ymd}'"
-    col=ck.getColumns(f"{table}")
-    data = db.quchen_text.get_data_list(sql)
-    data1=[]
-    for i in data:
-        i[0]=str(i[0])
-        i[9]=str(i[9])
-
-        data1.append(tuple(i))
-    ck.execute(f"alter table {table} drop  partition '{ymd}' ")
-    ck.insertMany(f"{table}", col, tuple(data1))
-
-
-
 
 if __name__ == '__main__':
-    order_sync_ck('2020-11-03')
+    # order_sync_ck('2020-11-03')
     # dw_order_channel_cost_sync_ck(dt.get_n_days(-2))
     # for i in dt.getDateLists('2019-03-18','2020-12-17'):
     # #     order(i)
-    #     dw_order_channel_sync_ck(i)
+    #     dw_order_channel_sync_ck(i)
+    print(ck.getColumns(f"dw_daily_channel_cost"))

+ 13 - 9
model/DataBaseOperation.py

@@ -70,22 +70,26 @@ class MysqlOperation:
     def execute(self, sql,data=None):
         start = time.time()
         if data:
-            self.cursor.execute(sql,data)
+            k=self.cursor.execute(sql,data)
         else:
-            self.cursor.execute(sql)
+            k=self.cursor.execute(sql)
         self.conn.commit()
-        if MYSQL_DEBUG:
+        # if MYSQL_DEBUG:
+        #
+        #     # log.info('sql: \n' + sql)
+        #     log.info('sql cost: %s' % (time.time() - start))
+        print(f"affect rows :{k}")
+
 
-            # log.info('sql: \n' + sql)
-            log.info('sql cost: %s' % (time.time() - start))
 
     def executeMany(self,sql,data):
         start = time.time()
-        self.cursor.executemany(sql,data)
+        k=self.cursor.executemany(sql,data)
         self.conn.commit()
-        if MYSQL_DEBUG:
-            log.info('sql: \n' + sql)
-            log.info('sql cost: %s' % (time.time() - start))
+        # if MYSQL_DEBUG:
+        #     log.info('sql: \n' + sql)
+        #     log.info('sql cost: %s' % (time.time() - start))
+        print(f"affect rows :{k}")
 
     def getOne(self,sql, args=None):
         result = self.getData(sql, args)