Browse Source

MOD:素材排行榜,图片归属人逻辑修改

cxyu 3 years ago
parent
commit
2726130ce5
1 changed files with 86 additions and 28 deletions
  1. 86 28
      app/etl/dw/dw_image_cost_day.py

+ 86 - 28
app/etl/dw/dw_image_cost_day.py

@@ -15,9 +15,10 @@ du = DateUtils()
 
 def run(dt):
     sql = f"""
-            SELECT a.dt,b.type,count(*) as ct,sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
+            SELECT a.dt,b.type,count(*) as ct,sum(a.cost),sum(view_count),sum(click_count),
+            sum(follow_count),sum(order_count),sum(order_amount),
             title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''),
-            g.last_modified_time,g.campaign_id 
+            g.created_time,g.campaign_id 
             from 
             ad_cost_day a 
             left join ad_info b on a.ad_id=b.ad_id
@@ -43,15 +44,13 @@ def run(dt):
         li.extend(i[16].split(','))
     # TODO:之后如果一天产生的图片过多,可能超过sql的字符限制
 
-    # TODO:归属人数据有问题
-
     # 之后数据使用hive,来进行数据存储
 
     sql3 = f"select image_id,preview_url,signature,width,height,size,`type` from image_info where  image_id in ({str(set(li))[1:-1]})"
 
     image_di = {}
     image_data = db.quchen_text.getData(sql3)
-    signature_dict = {}  # key signature_id  v:(pitcher,last_modified_time)
+    signature_dict = {}  # key signature_id  v:(pitcher,created_time)
     for x in image_data:
         image_di[x[0]] = (x[1], x[2], x[3], x[4], x[5], x[6])
 
@@ -63,12 +62,34 @@ def run(dt):
         if signature_tmp not in signature_dict.keys():
             signature_dict[signature_tmp] = (i[15], i[17])
         else:
-            sig_last_modified_time = signature_dict[signature_tmp][1]
-            if sig_last_modified_time is None:
+            sig_created_time = signature_dict[signature_tmp][1]
+            if sig_created_time is None:
                 signature_dict[signature_tmp] = (i[15], i[17])
-            elif i[17] is not None and i[17] < sig_last_modified_time:
+            elif i[17] is not None and i[17] < sig_created_time:
                 signature_dict[signature_tmp] = (i[15], i[17])
-    # print(image_di)
+    # 1.通过signature找到数据库中最老的signature对应的pitcher
+    signature_list = "'" + "','".join([str(i) for i in signature_dict.keys()]) + "'"
+    sql = f'''
+    select owner ,b.signature as signature  from 
+    (select min(dt) as dt,signature from dw_image_cost_day dicd 
+    where dt<'{dt}' and length (signature)>1
+    and signature in ({signature_list})
+    group by signature
+    ) as b
+    inner join(
+    select * from 
+    dw_image_cost_day 
+    where dt<'{dt}' and length (signature)>1
+     and signature in ({signature_list})
+    ) as a
+    on a.dt=b.dt and a.signature = b.signature
+    group by signature,owner
+    '''
+    signature_info = db.dm.get_data_list(sql)
+    for i in signature_info:
+        owner, signature = i
+        signature_dict[signature] = (owner, signature_dict[signature][1])
+    # 2.需要对应处理如果signature为null,或者,,,,这种,设置为投手本人
 
     for i in data:
         preview_url = ''
@@ -100,7 +121,11 @@ def run(dt):
                 size = size + ',' + '0'
                 type = type + ',' + ' '
         signature = signature[1:]
-        owner = signature_dict[signature][0]
+        pitcher = i[15]
+        if len(signature.replace(' ', '').replace(',', '')) == 0:
+            owner = pitcher
+        else:
+            owner = signature_dict[signature][0]
         i[16] = image_id[1:]
         i.append(preview_url[1:])
         i.append(signature)
@@ -122,14 +147,11 @@ def run(dt):
         data_new.append(i)
     data = data_new
 
-
-    # exit(0)
-    sql_video = f""" 
-            select foo.*,foo2.pitcher as owner from
+    sql_video = f"""  select foo.*,if(foo2.pitcher,foo2.pitcher,foo.pitcher) as owner from
             (SELECT a.dt,b.type,count(*),sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
             title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''),g.preview_url,g.signature,1,
             g.width,g.height,g.`size` ,g.`type` as video_type ,g.video_length ,g.byte_rate ,g.video_meta_data,g.download_path
-            ,min(h.last_modified_time) as last_modified_time , h.campaign_id 
+            ,min(h.created_time) as created_time , h.campaign_id 
             from 
             ad_cost_day a 
             left join ad_info b on a.ad_id=b.ad_id
@@ -141,7 +163,16 @@ def run(dt):
             where a.dt='{dt}'  and c.is_video=1 and h.campaign_id  is not null
             group by h.campaign_id) as foo
             inner join             
-            (select pitcher,min(h.last_modified_time) as last_modified_time  
+            (select signature,pitcher from ad_cost_day a 
+            left join ad_info b on a.ad_id=b.ad_id
+            left join adcreative_info c on b.adcreative_id=c.adcreative_id
+            left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
+            left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
+            left join video_info g on c.image_id=g.video_id
+            left join campaign_info h on b.campaign_id = h.campaign_id 
+            where a.dt='{dt}'  and c.is_video=1 and h.campaign_id  is not null
+            and (signature,h.created_time) in 
+            (select signature,min(h.created_time) as created_time  
             from 
             ad_cost_day a 
             left join ad_info b on a.ad_id=b.ad_id
@@ -151,18 +182,46 @@ def run(dt):
             left join video_info g on c.image_id=g.video_id
             left join campaign_info h on b.campaign_id = h.campaign_id 
             where a.dt='{dt}'  and c.is_video=1 and h.campaign_id  is not null
-            group by pitcher,h.last_modified_time ) as foo2 
-            on foo.pitcher=foo2.pitcher and foo.last_modified_time=foo2.last_modified_time
+            and length (signature)>6
+            group by signature)
+            group by signature ,pitcher ) as foo2 
+            on  foo.signature=foo2.signature
             """
-
     data_video = db.quchen_text.get_data_list(sql_video)
+
+    signature_list = "'" + "','".join([str(i[18]) for i in data if i[18] and len(i[18]) > 6]) + "'"
+
+    sql = f'''
+    select owner ,b.signature as signature  from 
+    (select min(dt) as dt,signature from dw_image_cost_day dicd 
+    where dt<'{dt}' and length (signature)>1
+    and signature in ({signature_list})
+    group by signature
+    ) as b
+    inner join(
+    select * from 
+    dw_image_cost_day 
+    where dt<'{dt}' and length (signature)>1
+     and signature in ({signature_list})
+    ) as a
+    on a.dt=b.dt and a.signature = b.signature
+    group by signature,owner
+    '''
+    signature_info = db.dm.get_data_list(sql)
+    signature_dict_video = {}
+    for i in signature_info:
+        owner, signature = i
+        signature_dict_video[signature] = owner
+
     data_new = []
     for i in data_video:
         i = i[:-3] + i[-2:]
+        signature = i[18]
+        if signature in signature_dict_video.keys():
+            i[-1] = signature_dict_video[signature]
         data_new.append(i)
     data.extend(data_new)
     # 进行数据存储
-
     db.dm.execute(f'delete from dw_image_cost_day where dt="{dt}"')
     db.dm.executeMany(
         '''replace into dw_image_cost_day 
@@ -174,7 +233,6 @@ def run(dt):
         data)
 
 
-
 def hourly():
     try:
         logging.info('广告数据清洗,开始')
@@ -209,12 +267,12 @@ if __name__ == '__main__':
     )
     # -495
     #
-    # for i in du.getDateLists(du.get_n_days(-495), du.get_n_days(0)):
-    #     print(i)
-    #     # exit()
-    #     run(i)
+    for i in du.getDateLists(du.get_n_days(-495), du.get_n_days(0)):
+        print(i)
+        # exit()
+        run(i)
 
     # print(du.get_n_days(-20))
-    run(du.get_n_days(0))
-
-
+    # run(du.get_n_days(0))
+    # print(du.get_n_days(-30))
+    # run(du.get_n_days(-30))