cxyu 3 anni fa
parent
commit
05eaf7c76b

+ 13 - 7
app/api_data/tx_ad_cost/get_cost.py

@@ -133,6 +133,9 @@ def run(dt):
         logging.info('获取广告创意素材,开始')
         adcreative(dt)
         logging.info('获取广告创意素材,结束')
+        logging.info('获取广告计划信息,开始')
+        adcreative(dt)
+        logging.info('获取广告计划信息,结束')
         logging.info('获取图片信息,开始')
         image(dt)
         logging.info('获取图片信息,结束')
@@ -184,12 +187,15 @@ if __name__ == '__main__':
     # adcreative('2021-05-11')
     # video('2021-05-14')
     # campaign('2021-05-14')
-    ad_cost_day(du.get_n_days(-365), du.get_n_days(0))
-
-    for dt in list(reversed(du.getDateLists(du.get_n_days(-500), du.get_n_days(0)))):
-        print(dt)
-        # run(dt)
-        # ad(dt)
-        # adcreative(dt)
+    # ad_cost_day(du.get_n_days(-365), du.get_n_days(0))
+
+    # for dt in list(du.getDateLists(du.get_n_days(-239), du.get_n_days(0))):
+    #     print(dt)
+    #     campaign(dt)
+    # run(dt)
+    # ad(dt)
+    # adcreative(dt)
     # image(dt)
     # video(dt)
+    # campaign(du.get_n_days(0))
+    run(du.get_n_days(0))

+ 89 - 21
app/etl/dw/dw_image_cost_day.py

@@ -4,6 +4,7 @@ from model.DateUtils import DateUtils
 from model.DingTalkUtils import DingTalkUtils
 from datetime import datetime
 import logging
+
 # logging.getLogger().setLevel(logging.WARNING)
 
 
@@ -13,16 +14,19 @@ du = DateUtils()
 
 
 def run(dt):
-    sql = f"""SELECT a.dt,b.type,count(*) as ct,sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
-            title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,'')
+    sql = f"""
+            SELECT a.dt,b.type,count(*) as ct,sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
+            title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''),
+            g.last_modified_time,g.campaign_id 
             from 
             ad_cost_day a 
             left join ad_info b on a.ad_id=b.ad_id
             left join adcreative_info c on b.adcreative_id=c.adcreative_id
             left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
             left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
-            where a.dt='{dt}'  and c.is_video=0
-            group by a.dt,b.type,title,description,book,platform,stage,image_id,e.channel,pitcher  
+            left join campaign_info g on b.campaign_id = g.campaign_id 
+            where a.dt='{dt}'  and c.is_video=0 and g.campaign_id is not null
+            group by a.dt,b.type,title,description,book,platform,stage,image_id,e.channel,pitcher
             
             """
 
@@ -36,17 +40,31 @@ def run(dt):
     li = []
     for i in data:
         # print(i)
-        li.extend(i[-1].split(','))
-    # TODO:之后如果一天产生的图片过多,可能超过sql的字符限制,
+        li.extend(i[16].split(','))
+    # TODO:之后如果一天产生的图片过多,可能超过sql的字符限制
     # 之后数据使用hive,来进行数据存储
 
     sql3 = f"select image_id,preview_url,signature,width,height,size,`type` from image_info where  image_id in ({str(set(li))[1:-1]})"
 
     image_di = {}
     image_data = db.quchen_text.getData(sql3)
+    signature_dict = {}  # key signature_id  v:(pitcher,last_modified_time)
     for x in image_data:
         image_di[x[0]] = (x[1], x[2], x[3], x[4], x[5], x[6])
 
+    for i in data:
+        signature_tmp = ''
+        for j in i[16].split(','):
+            signature_tmp = signature_tmp + ',' + (image_di.get(j)[1] if image_di.get(j) else ' ')
+        signature_tmp = signature_tmp[1:]
+        if signature_tmp not in signature_dict.keys():
+            signature_dict[signature_tmp] = (i[15], i[17], i[18])
+        else:
+            sig_last_modified_time = signature_dict[signature_tmp][1]
+            if sig_last_modified_time is None:
+                signature_dict[signature_tmp] = (i[15], i[17], i[18])
+            elif i[17] is not None and i[17] < sig_last_modified_time:
+                signature_dict[signature_tmp] = (i[15], i[17], i[18])
     # print(image_di)
 
     for i in data:
@@ -61,7 +79,7 @@ def run(dt):
         video_bit_rate = None
         video_meta_data = None
         download_path = None
-        for j in i[-1].split(','):
+        for j in i[16].split(','):
             if image_di.get(j):
                 image_id = image_id + ',' + j
                 preview_url = preview_url + ',' + image_di.get(j)[0]
@@ -78,9 +96,12 @@ def run(dt):
                 height = '0'
                 size = size + ',' + '0'
                 type = type + ',' + ' '
-        i[-1] = image_id[1:]
+        signature = signature[1:]
+        owner = signature_dict[signature][0]
+        campaign_id = signature_dict[signature][2]
+        i[16] = image_id[1:]
         i.append(preview_url[1:])
-        i.append(signature[1:])
+        i.append(signature)
         i.append(0)
         i.append(width)
         i.append(height)
@@ -90,11 +111,35 @@ def run(dt):
         i.append(video_bit_rate)
         i.append(video_meta_data)
         i.append(download_path)
+        i.append(campaign_id)
+        i.append(owner)
+    data_new = []
+    for i in data:
+        i = i[:17] + i[19:]
+        data_new.append(i)
+    data = data_new
+
 
     # exit(0)
-    sql_video = f"""SELECT a.dt,b.type,count(*),sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
+    sql_video = f""" 
+            select foo.*,foo2.pitcher as owner from
+            (SELECT a.dt,b.type,count(*),sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
             title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''),g.preview_url,g.signature,1,
-            g.width,g.height,g.`size` ,g.`type` ,g.video_length ,g.byte_rate ,g.video_meta_data,g.download_path
+            g.width,g.height,g.`size` ,g.`type` as video_type ,g.video_length ,g.byte_rate ,g.video_meta_data,g.download_path
+            ,min(h.last_modified_time) as last_modified_time , h.campaign_id 
+            from 
+            ad_cost_day a 
+            left join ad_info b on a.ad_id=b.ad_id
+            left join adcreative_info c on b.adcreative_id=c.adcreative_id
+            left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
+            left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
+            left join video_info g on c.image_id=g.video_id
+            left join campaign_info h on b.campaign_id = h.campaign_id 
+            where a.dt='{dt}'  and c.is_video=1 and h.campaign_id  is not null
+            group by a.dt,b.type,title,description,
+            book,platform,stage,image_id,e.channel,pitcher) as foo
+            inner join             
+            (select pitcher,min(h.last_modified_time) as last_modified_time  
             from 
             ad_cost_day a 
             left join ad_info b on a.ad_id=b.ad_id
@@ -102,31 +147,51 @@ def run(dt):
             left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
             left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
             left join video_info g on c.image_id=g.video_id
-            where a.dt='{dt}'  and c.is_video=1
-            group by a.dt,b.type,title,description,book,platform,stage,image_id,e.channel,pitcher """
+            left join campaign_info h on b.campaign_id = h.campaign_id 
+            where a.dt='{dt}'  and c.is_video=1 and h.campaign_id  is not null
+            group by pitcher,h.last_modified_time ) as foo2 
+            on foo.pitcher=foo2.pitcher and foo.last_modified_time=foo2.last_modified_time
+            """
 
     data_video = db.quchen_text.get_data_list(sql_video)
-    data.extend(data_video)
+    data_new = []
+    for i in data_video:
+        i = i[:-3] + i[-2:]
+        data_new.append(i)
+    data.extend(data_new)
+
 
     # 进行数据存储
+
     db.dm.execute(f'delete from dw_image_cost_day where dt="{dt}"')
     db.dm.executeMany(
         '''replace into dw_image_cost_day 
         (dt,type,use_times,cost,view_count,click_count,follow_count,order_count,
         order_amount,title,description,book,platform,stage,channel,pitcher,image_id,
         preview_url,signature,is_video,width,height,size,format,video_length,
-        video_bit_rate,video_meta_data,download_path)
-        values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''',
+        video_bit_rate,video_meta_data,download_path,campaign_id,owner)
+        values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''',
         data)
 
+    # # 图片数据归属权改变
+    # # 大于五天的数据进行,消耗超过5000的owner置位null
+    # owner_sql = '''
+    #     UPDATE dw_image_cost_day
+    #     set owner = null
+    #     WHERE
+    #     dt<date_add(now(),interval -5 day) or cost>5000
+    # '''
+    # db.dm.execute(owner_sql)
+
+
     # ck对应数据也保存一份
     # 1.进行当天相关的分区删除
-    ck.client.execute(f''' alter table dw_image_cost_day drop partition '{dt}' ''' )
+    ck.client.execute(f''' alter table dw_image_cost_day drop partition '{dt}' ''')
     col = ['dt', 'type', 'use_times', 'cost', 'view_count', 'click_count',
            'follow_count', 'order_count', 'order_amount', 'title', 'description',
            'book', 'platform', 'stage', 'channel', 'pitcher', 'image_id', 'preview_url',
            'signature', 'is_video', 'width', 'height', 'size', 'format', 'video_length',
-           'video_bit_rate', 'video_meta_data', 'download_path']
+           'video_bit_rate', 'video_meta_data', 'download_path','campaign_id', 'owner']
     # ck存入前进行数据格式化
     for _ in data:
         # data= [col if col else 'null' for col in data ]
@@ -151,6 +216,7 @@ def run(dt):
     ck.client.execute('insert into dw_image_cost_day ({}) values'.format(col_str), data)
     logging.info('ck填充数据,结束')
 
+
 def hourly():
     try:
         logging.info('广告数据清洗,开始')
@@ -183,9 +249,11 @@ if __name__ == '__main__':
         level=logging.INFO,
         format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s"
     )
-    #-495
+    # -495
+    #
     # for i in du.getDateLists(du.get_n_days(-495), du.get_n_days(0)):
     #     print(i)
     #     run(i)
-    print(du.getNow())
-    run(du.get_n_days(-1))
+
+    # print(du.get_n_days(-20))
+    run(du.get_n_days(0))

+ 0 - 1
data_processing/video_processing.py

@@ -8,7 +8,6 @@ import oss2
 from model.sql_models import DB
 from config import using_config as db_config
 
-
 def change_format(video_url, file_md5=None):
     now_time = int(time.time() * 1000)
     input_file = '/tmp/{}.mp4'.format(now_time)

+ 13 - 10
example/update_cost_data.py

@@ -37,11 +37,12 @@ def get_data(st, et):
 
 def get_data_vx(channel, st, et):
     # 用于处理单个微信号相关信息
-    sql = '''select account_id,wechat_account_id,access_token,refresh_token,name,
-                ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from advertiser_vx
-                where account_id in (select account_id from channel_by_account_daily
-                where channel ='{}' 
-                order by dt desc  )
+    sql = '''
+            select account_id,wechat_account_id,access_token,refresh_token,name,
+            ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from advertiser_vx
+            where account_id in (select account_id from channel_by_account_daily
+            where channel ='{}'  and dt in (select max(dt) from channel_by_account_daily cbad)
+            order by dt desc  )
                 '''.format(channel)
 
     token_list_v = db.quchen_text.getData(sql)
@@ -67,7 +68,7 @@ def get_data_gdt(channel, st, et):
     sql = '''select account_id,'',access_token,refresh_token,name,
                 ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from advertiser_qq
                 where account_id in (select account_id from channel_by_account_daily
-                where channel ='{}' 
+                where channel ='{}' and dt in (select max(dt) from channel_by_account_daily cbad)
                 order by dt desc  )
                 '''.format(channel)
     print(sql)
@@ -101,8 +102,10 @@ if __name__ == "__main__":
         level=logging.INFO,
         format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s"
     )
-    st = du.get_n_days(-20)
-    et = du.get_n_days(0)
-    get_data_vx(channel='语慕书海', st=st, et=et)
-
+    st = du.get_n_days(-50)
+    et = du.get_n_days(-10)
+    # get_data_vx(channel='落枫文海', st=st, et=et)
+    # get_data_gdt(channel='落枫文海', st=st, et=et)
     # get_data(st,et)
+    from app.etl import data_stat_task
+    data_stat_task.dw_daily_channel_cost('2019-07-12')