Bladeren bron

MOD:添加实例--更新广告信息模块

cxyu 4 jaren geleden
bovenliggende
commit
cd81c4ede1
5 gewijzigde bestanden met toevoegingen van 258 en 37 verwijderingen
  1. 4 2
      app/etl/sync_to_ck_task.py
  2. 130 5
      example/bytedance_info_tmp_insert.py
  3. 4 6
      example/get_ad_cost_min.py
  4. 101 12
      example/update_cost_data.py
  5. 19 12
      example/update_order_data.py

+ 4 - 2
app/etl/sync_to_ck_task.py

@@ -72,8 +72,10 @@ def campaign_vx():
 
 
 if __name__ == '__main__':
-    order_sync_ck('2021-06-03')
+    # order_sync_ck('2021-06-03')
     # dw_order_channel_cost_sync_ck(dt.get_n_days(-2))
-    # for i in dt.getDateLists('2019-03-18','2020-12-17'):
+    for i in dt.getDateLists('2021-09-29','2021-10-06'):
     # #     order(i)
     #     dw_order_channel_sync_ck(i)
+        print(i)
+        order_sync_ck(i)

+ 130 - 5
example/bytedance_info_tmp_insert.py

@@ -1,25 +1,150 @@
 from model.DateUtils import DateUtils
-from model.DataBaseUtils import MysqlUtils
+from model.DataBaseUtils import MysqlUtils, CkUtils
 
 db = MysqlUtils()
 du = DateUtils()
+ck = CkUtils()
 
 
 def insert_data():
     bt_sql = '''
-    select b.pitcher,b.channel ,DATE_FORMAT(a.`date`, '%Y-%m-%d'),sum(cost) as cost,sum(view_count) as view_count ,
+    select b.pitcher,b.channel ,DATE_FORMAT(a.`date`, '%Y-%m-%d'),
+    round(sum(cost)/100,2) as cost,sum(view_count) as view_count ,
     sum(valid_click_count) as click_count from daily_tt a 
     left join bytedance_pitcher_change b
     on a.account_id =b.advertiser_id 
     group by b.pitcher ,a.`date` ,b.channel 
     '''
     byte_list = db.quchen_text.get_data_list(bt_sql)
-    for _ in byte_list:
-        print(_)
     isql = "insert into bytedance_info_tmp values (%s,%s,%s,%s,%s,%s)"
     db.dm.execute("truncate table bytedance_info_tmp")
     db.dm.executeMany(isql, byte_list)
 
 
+def dw_daily_bytedance_cost(ymd):
+    dt_sql = f'''
+    select b.pitcher,b.channel ,a.`date` ,round(sum(cost)/100,2) as cost,sum(view_count) as view_count ,
+    sum(valid_click_count) as click_count ,c.stage as stage,d.book as book, e.current_platform as platform
+    from daily_tt a 
+    left join bytedance_pitcher_change b on a.account_id =b.advertiser_id 
+    left join stage_change c on b.channel =c.channel 
+    left join book_change d on b.channel =d.name 
+    left join platform_change e on b.channel = e.name
+    where a.`date`='{ymd}'
+    group by b.pitcher ,a.`date` ,b.channel 
+    '''
+
+    data_list = db.quchen_text.get_data_list(dt_sql)
+    byte_list = []
+    for _ in data_list:
+        _[2] = str(_[2]) if _[2] else 0
+        _[3] = round(float(_[3]), 2) if _[3] else 0
+        _[4] = round(float(_[4]), 2) if _[4] else 0
+        _[5] = round(float(_[5]), 2) if _[5] else 0
+        _[6] = str(_[6]) if _[6] else ''
+        _[7] = str(_[7]) if _[7] else ''
+        _[8] = str(_[8]) if _[8] else ''
+        _.append('BYTEDANCE')
+        byte_list.append(tuple(_))
+    col = '''pitcher,channel,dt,cost,view_count,click_count,stage,book,platform,type'''
+    print(byte_list)
+    ck.execute(f"alter table dw_daily_bytedance_cost drop  partition '{ymd}' ")
+
+    ck.insertMany("dw_daily_bytedance_cost", col, tuple(byte_list))
+
+
+def replace_data():
+    channel_name='BYTEDANCE'
+    sql = f"""
+select
+       dt1,channel1,pitcher,stage,platform,book,'{channel_name}' type,
+       order_count,order_user,order_amount,
+       first_order_count,first_order_user,first_order_amount,
+       view_count,click_count,follow_user,
+       cost,reg_order_count,reg_order_user,reg_order_amount,
+       web_view_count,platform_view_count,web_order_count,
+       reg_order_user_again,
+       reg_order_user_again3,
+       reg_order_user_again4,
+       reg_order_user_again5,
+       reg_order_user_again6
+from (
+    select 
+    if(dt4='1970-01-01',dt,dt4) dt1,if(channel4='',channel,channel4) channel1,
+    pitcher,stage,platform,book,cost,view_count,click_count,   ---基础属性和消耗数据
+        follow_user,web_view_count,platform_view_count,web_order_count,'{channel_name}' type,
+        require_roi,require_mult,order_count,order_user,order_amount from
+
+        (select * from dw_daily_bytedance_cost) aa
+        left  join
+        (select date as dt4,channel as channel4,count(1) as order_count,   ---账面充值
+        count(distinct user_id) as order_user,sum(amount) as order_amount
+        from order where status=2    group by date,channel) dd
+        on dt=dt4 and channel=channel4) a
+
+left outer join (
+    select sum(if(user_order_count>=2,1,0)) reg_order_user_again,channel channel6,toDate(reg_date) dt6,  ---复冲人数
+           sum(if(user_order_count>=3,1,0)) reg_order_user_again3,
+           sum(if(user_order_count>=4,1,0)) reg_order_user_again4,
+           sum(if(user_order_count>=5,1,0)) reg_order_user_again5,
+           sum(if(user_order_count>=6,1,0)) reg_order_user_again6
+    from (select formatDateTime(reg_time,'%Y-%m-%d') reg_date,channel,count(1) user_order_count
+    from order where status=2  group by formatDateTime(reg_time,'%Y-%m-%d') ,user_id,channel) x group by reg_date,channel
+    ) f on dt1=dt6 and channel1=channel6
+
+left outer join
+   (
+       select toDate(formatDateTime(reg_time,'%Y-%m-%d')) as dt2,    ---新用户累计充值数据
+   channel as channel2,
+   sum(amount) as reg_order_amount,
+   count(distinct user_id) as reg_order_user,
+   count(1) as reg_order_count
+   from order where status=2 and reg_time>'2019-03-18 00:00:00' group by toDate(formatDateTime(reg_time,'%Y-%m-%d')),channel) b
+    on dt1=dt2 and channel1=channel2
+left outer join
+     (select date as dt3,channel as channel3,count(1) as first_order_count,          ---新用户首日充值
+     count(distinct user_id) as first_order_user,sum(amount) as first_order_amount
+    from order where status=2 and toDate(reg_time)=date  group by date,channel) c
+    on dt1=dt3 and channel1=channel3    
+
+    having order_amount+cost+reg_order_amount>0"""
+
+    data = ck.execute(sql)
+    print(data)
+
+    isql = "replace into dw_channel values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
+    # db.dm.execute("truncate table dw_channel")
+    db.dm.executeMany(isql, data)
+
+def platform_data_sum(ymd):
+    ck.execute("alter table dw_daily_platform_cost drop  partition '{}' ".format(ymd))
+    sql=f'''
+        insert into dw_daily_platform_cost
+        select * from 
+        (select * from dw_daily_bytedance_cost a where dt='{ymd}'
+        union all 
+        select * from dw_daily_channel_cost b where dt='{ymd}'
+        AND channel not  in (select channel from dw_daily_bytedance_cost a 
+        where dt='{ymd}'))
+    '''
+    ck.execute(sql)
+
+
 if __name__ == '__main__':
-    insert_data()
+    dt = DateUtils()
+
+    # insert_data()
+    for i in dt.getDateLists('2021-09-18','2021-09-23'):
+        print(i)
+        dw_daily_bytedance_cost(i)
+        platform_data_sum(i)
+    exit()
+    # replace_data()
+    for i in dt.getDateLists('2019-03-18','2021-10-17'):
+        print(i)
+        platform_data_sum(i)
+
+
+
+
+

+ 4 - 6
example/get_ad_cost_min.py

@@ -147,16 +147,14 @@ def get_data():
             get_ad_data()
             logging.info('获取数据,结束')
             logging.info('dw_image_cost 进行数据更新')
-
-            dw_image_cost_day.run(du.get_n_days(0))
-            logging.info('dw_image_cost 进行数据更新,结束')
-            time.sleep(60*3)
             global db
             db.close()
             db = MysqlUtils()
+            dw_image_cost_day.run(du.get_n_days(0))
+            logging.info('dw_image_cost 进行数据更新,结束')
+            time.sleep(60*3)
         except Exception as e:
-            raise
-            print(e)
+            print('error',e)
             time.sleep(60)
 
 

+ 101 - 12
example/update_cost_data.py

@@ -1,5 +1,5 @@
 from model.DateUtils import DateUtils
-from app.api_data.tx_ad_cost import get_cost_older
+from app.api_data.tx_ad_cost import get_cost_older,get_cost
 from app.etl.data_stat_run import do_cost
 from app.etl.dm.dm_pitcher_daily_overview import dm_pitcher_daily_overview
 from app.etl.dw.dw_channel_daily import dw_channel_daily
@@ -34,6 +34,94 @@ def get_data(st, et):
     book_trend()
     dm_pitcher_daily_overview()
 
+def get_data_vx_adinfo(channel, st, et):
+    # 用于处理单个微信号相关信息
+    sql = '''
+            select account_id,wechat_account_id,access_token,refresh_token,name,
+            ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from advertiser_vx
+            where account_id in (select account_id from channel_by_account_daily
+            where channel ='{}'  and dt in (select max(dt) from channel_by_account_daily cbad)
+            order by dt desc  )
+                '''.format(channel)
+
+    token_list_v = db.quchen_text.getData(sql)
+    print(token_list_v)
+    account_id=token_list_v[0][0]
+    access_token=token_list_v[0][2]
+    type='MP'
+
+    #获取广告id
+    get_cost.get_ad_cost_day(account_id, access_token, type, st, et)
+
+
+    for dt in du.getDateLists(st,et):
+        #获取广告基础信息
+        sql = f"""SELECT b.account_id,b.access_token,b.type,GROUP_CONCAT(ad_id)  from ad_cost_day  a
+            left join (
+            select account_id,access_token,'MP' type  from advertiser_vx where (name !='' or name is not null) union 
+            select account_id,access_token,'GDT' type  from advertiser_qq where (name !='' or name is not null)
+            ) b on a.account_id=b.account_id
+            where a.dt='{dt}' and a.account_id ='{account_id}'
+            GROUP BY b.account_id,b.access_token,b.type"""
+
+        accounts = db.quchen_text.getData(sql)
+
+        for account in accounts:
+            get_cost.get_ad_info(account[0], account[1], account[2], account[3], dt)
+
+        #获取广告创意素材
+        sql = f"""SELECT b.account_id,b.access_token,b.type,GROUP_CONCAT(adcreative_id)  from ad_info  a
+                    left join (
+                    select account_id,access_token,'MP' type  from advertiser_vx where  (name !='' or name is not null) union 
+                    select account_id,access_token,'GDT' type from advertiser_qq where (name !='' or name is not null)
+                    ) b on a.account_id=b.account_id
+                    where a.dt='{dt}' and a.account_id='{account_id}'
+                    GROUP BY b.account_id,b.access_token,b.type having b.account_id is not null """
+
+        accounts = db.quchen_text.getData(sql)
+
+        for account in accounts:
+            get_cost.get_adcreatives(account[0], account[1], account[2], account[3], dt)
+
+        #获取广告计划信息
+        sql = f"""SELECT b.account_id,b.access_token,b.type,GROUP_CONCAT(campaign_id)  from ad_info  a
+                    left join (
+                    select account_id,access_token,'MP' type  from advertiser_vx where  (name !='' or name is not null) union 
+                    select account_id,access_token,'GDT' type from advertiser_qq where  (name !='' or name is not null)
+                    ) b on a.account_id=b.account_id
+                    where a.dt='{dt}' and a.account_id='{account_id}'
+                    GROUP BY b.account_id,b.access_token,b.type having b.account_id is not null """
+
+        accounts = db.quchen_text.getData(sql)
+
+        for account in accounts:
+            get_cost.get_campaign(account[0], account[1], account[2], account[3], dt)
+
+        #获取图片信息
+        sql = f"""SELECT b.account_id,b.access_token,b.type,GROUP_CONCAT(image_id)  from adcreative_info  a
+                    left join (
+                    select account_id,access_token,'MP' type  from advertiser_vx where (name !='' or name is not null) union 
+                    select account_id,access_token,'GDT' type from advertiser_qq where (name !='' or name is not null)
+                    ) b on a.account_id=b.account_id
+                    where a.dt='{dt}' and a.account_id='{account_id}' and a.is_video=0
+                    GROUP BY b.account_id,b.access_token,b.type"""
+        accounts = db.quchen_text.getData(sql)
+        for account in accounts:
+            get_cost.images_info_get( account[0], account[1], account[3])
+
+        #获取视频信息
+        sql = f"""SELECT b.account_id,b.access_token,b.type,GROUP_CONCAT(image_id)  from adcreative_info  a
+                    left join (
+                    select account_id,access_token,'MP' type  from advertiser_vx where (name !='' or name is not null) union 
+                    select account_id,access_token,'GDT' type from advertiser_qq where (name !='' or name is not null)
+                    ) b on a.account_id=b.account_id
+                    where a.dt='{dt}' and a.account_id='{account_id}' and a.is_video=1
+                    GROUP BY b.account_id,b.access_token,b.type"""
+        accounts = db.quchen_text.getData(sql)
+        for account in accounts:
+            get_cost.video_info_get( account[0], account[1], account[3])
+
+
 
 def get_data_vx(channel, st, et):
     # 用于处理单个微信号相关信息
@@ -55,12 +143,12 @@ def get_data_vx(channel, st, et):
         print(_)
     print('get_daily_vx:' + str(len(li)) + 'cost:' + str(int(time.time() - time1)))
     mysql_insert_daily_vx(li)
-    # do_cost(st, et)
-    # src_book_info()  # 书籍卡点信息
-    # dw_channel_daily()
-    # dw_pitcher_trend()
-    # book_trend()
-    # dm_pitcher_daily_overview()
+    do_cost(st, et)
+    src_book_info()  # 书籍卡点信息
+    dw_channel_daily()
+    dw_pitcher_trend()
+    book_trend()
+    dm_pitcher_daily_overview()
 
 
 def get_data_gdt(channel, st, et):
@@ -102,10 +190,11 @@ if __name__ == "__main__":
         level=logging.INFO,
         format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s"
     )
-    st = du.get_n_days(-50)
-    et = du.get_n_days(-10)
-    # get_data_vx(channel='落枫文海', st=st, et=et)
+    st = du.get_n_days(-365)
+    et = du.get_n_days(0)
+    # get_data_vx(channel='丹青文阅', st=st, et=et)
     # get_data_gdt(channel='落枫文海', st=st, et=et)
     # get_data(st,et)
-    from app.etl import data_stat_task
-    data_stat_task.dw_daily_channel_cost('2019-07-12')
+    # from app.etl import data_stat_task
+    # data_stat_task.dw_daily_channel_cost('2019-07-12')
+    get_data_vx_adinfo(channel='丹青文阅', st='2021-06-01', et='2021-06-05')

+ 19 - 12
example/update_order_data.py

@@ -56,17 +56,17 @@ def order_date_get(st, et):
 def update_order():
     # 1.获取数据
 
-    st = du.get_n_days(-30)
+    st = du.get_n_days(-1)
     et = du.get_n_days(0)
     # print(st, et)
-    order_date_get(st, et)
-    # do_order(st, et)
-    # src_book_info()  # 书籍卡点信息
-    # # book_annual_expect_profit.run() # 年预期收益
-    # dw_channel_daily()
-    # dw_pitcher_trend()
-    # book_trend()
-    # dm_pitcher_daily_overview()
+    # order_date_get(st, et)
+    do_order(st, et)
+    src_book_info()  # 书籍卡点信息
+    # book_annual_expect_profit.run() # 年预期收益
+    dw_channel_daily()
+    dw_pitcher_trend()
+    book_trend()
+    dm_pitcher_daily_overview()
 
 
 def update_order_zhangzhognyun():
@@ -81,8 +81,15 @@ def update_order_zhangzhongyun_single():
 
 
 def update_order_guofeng():
-    guofeng(start=du.get_n_days(-30), end=du.get_n_days(0))
+    guofeng(start=du.get_n_days(-180), end=du.get_n_days(0))
 
+def update_order_guofeng_alone():
+    import re
+    account='87522768,rSBu9iyZ3zqKQ8Qt,1042064,趣程31期,qucheng31qi31@163.com'
+
+    account=account.replace('\n', '').split(",")
+    print(account)
+    get_gf_order_task(start=du.get_n_days(-180), end=du.get_n_days(0), account=account)
 
 def update_order_do():
     st = du.get_n_days(-30)
@@ -105,8 +112,8 @@ if __name__ == '__main__':
     )
     # yangguang.get_channel_info()
     # update_order()
-    # update_order()
     # update_order_zhangzhognyun()
     # update_order_zhangzhognyun()
     # update_order_zhangzhongyun_single()
-    update_order_guofeng()
+    # update_order_guofeng()
+    update_order_guofeng_alone()