from logging import handlers from model.DataBaseUtils import MysqlUtils, CkUtils from model.DateUtils import DateUtils from model.DingTalkUtils import DingTalkUtils from datetime import datetime import logging # logging.getLogger().setLevel(logging.WARNING) db = MysqlUtils() ck = CkUtils() du = DateUtils() def run(dt): sql = f""" SELECT a.dt,a.type,count(*) as ct,sum(a.cost),sum(view_count),sum(click_count), sum(follow_count),sum(order_count),sum(order_amount), title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''), g.created_time,a.campaign_id from ad_cost_day a left join ad_info b on a.ad_id=b.ad_id left join adcreative_info c on b.adcreative_id=c.adcreative_id left join channel_by_account_daily e on a.account_id=e.account_id and a.dt=e.dt left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt left join campaign_info g on a.campaign_id = g.campaign_id where a.dt='{dt}' and (c.is_video=0 or c.is_video is null) and a.campaign_id is not null group by a.campaign_id """ # print(sql) data = db.quchen_text.get_data_list(sql) # 图片链接拼接 li = [] for i in data: li.extend(i[16].split(',')) # TODO:之后如果一天产生的图片过多,可能超过sql的字符限制 # 之后数据使用hive,来进行数据存储 sql3 = f"select image_id,preview_url,signature,width,height,size,`type` from image_info where image_id in ({str(set(li))[1:-1]})" image_di = {} image_data = db.quchen_text.getData(sql3) signature_dict = {} # key signature_id v:(pitcher,created_time) for x in image_data: image_di[x[0]] = (x[1], x[2], x[3], x[4], x[5], x[6]) for i in data: signature_tmp = '' for j in i[16].split(','): signature_tmp = signature_tmp + ',' + (image_di.get(j)[1] if image_di.get(j) else ' ') signature_tmp = signature_tmp[1:] if signature_tmp not in signature_dict.keys(): signature_dict[signature_tmp] = (i[15], i[17]) else: sig_created_time = signature_dict[signature_tmp][1] if sig_created_time is None: signature_dict[signature_tmp] = (i[15], i[17]) elif i[17] is not None and i[17] < sig_created_time: signature_dict[signature_tmp] = (i[15], i[17]) # 1.通过signature找到数据库中最老的signature对应的pitcher signature_list = "'" + "','".join([str(i) for i in signature_dict.keys()]) + "'" sql = f''' select owner ,b.signature as signature from (select min(dt) as dt,signature from dw_image_cost_day dicd where dt<'{dt}' and length (signature)>1 and signature in ({signature_list}) group by signature ) as b inner join( select * from dw_image_cost_day where dt<'{dt}' and length (signature)>1 and signature in ({signature_list}) ) as a on a.dt=b.dt and a.signature = b.signature group by signature,owner ''' signature_info = db.dm.get_data_list(sql) for i in signature_info: owner, signature = i signature_dict[signature] = (owner, signature_dict[signature][1]) # 2.需要对应处理如果signature为null,或者,,,,这种,设置为投手本人 for i in data: preview_url = '' signature = '' width = '0' height = '0' image_id = '' size = '' type = '' video_length = None video_bit_rate = None video_meta_data = None download_path = None for j in i[16].split(','): if image_di.get(j): image_id = image_id + ',' + j preview_url = preview_url + ',' + image_di.get(j)[0] signature = signature + ',' + image_di.get(j)[1] width = str(image_di.get(j)[2]) height = str(image_di.get(j)[3]) size = size + ',' + str(image_di.get(j)[4]) type = type + ',' + str(image_di.get(j)[5]) else: image_id = image_id + ',' + j preview_url = preview_url + ',' + ' ' signature = signature + ',' + ' ' width = '0' height = '0' size = size + ',' + '0' type = type + ',' + ' ' signature = signature[1:] pitcher = i[15] if len(signature.replace(' ', '').replace(',', '')) == 0: owner = pitcher else: owner = signature_dict[signature][0] i[16] = image_id[1:] i.append(preview_url[1:]) i.append(signature) i.append(0) i.append(width) i.append(height) i.append(size[1:]) i.append(type[1:]) i.append(video_length) i.append(video_bit_rate) i.append(video_meta_data) i.append(download_path) i.append(i[18]) i.append(owner) data_new = [] for i in data: i = i[:17] + i[19:] # print(i) data_new.append(i) data = data_new sql_video = f""" select foo.*,if(foo2.pitcher,foo2.pitcher,foo.pitcher) as owner from (SELECT a.dt,a.type,count(*),sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount), title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''),g.preview_url,g.signature,1, g.width,g.height,g.`size` ,g.`type` as video_type ,g.video_length ,g.byte_rate ,g.video_meta_data,g.download_path ,min(h.created_time) as created_time , a.campaign_id from ad_cost_day a left join ad_info b on a.ad_id=b.ad_id left join adcreative_info c on b.adcreative_id=c.adcreative_id left join channel_by_account_daily e on a.account_id=e.account_id and a.dt=e.dt left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt left join video_info g on c.image_id=g.video_id left join campaign_info h on a.campaign_id = h.campaign_id where a.dt='{dt}' and c.is_video=1 and a.campaign_id is not null group by a.campaign_id) as foo left join (select signature,pitcher from ad_cost_day a left join ad_info b on a.ad_id=b.ad_id left join adcreative_info c on b.adcreative_id=c.adcreative_id left join channel_by_account_daily e on a.account_id=e.account_id and a.dt=e.dt left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt left join video_info g on c.image_id=g.video_id left join campaign_info h on a.campaign_id = h.campaign_id where a.dt='{dt}' and c.is_video=1 and a.campaign_id is not null and (signature,h.created_time) in (select signature,min(h.created_time) as created_time from ad_cost_day a left join ad_info b on a.ad_id=b.ad_id left join adcreative_info c on b.adcreative_id=c.adcreative_id left join channel_by_account_daily e on a.account_id=e.account_id and a.dt=e.dt left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt left join video_info g on c.image_id=g.video_id left join campaign_info h on a.campaign_id = h.campaign_id where a.dt='{dt}' and c.is_video=1 and a.campaign_id is not null and length (signature)>6 group by signature) group by signature ,pitcher ) as foo2 on foo.signature=foo2.signature """ data_video = db.quchen_text.get_data_list(sql_video) signature_list = "'" + "','".join([str(i[18]) for i in data if i[18] and len(i[18]) > 6]) + "'" sql = f''' select owner ,b.signature as signature from (select min(dt) as dt,signature from dw_image_cost_day dicd where dt<'{dt}' and length (signature)>1 and signature in ({signature_list}) group by signature ) as b inner join( select * from dw_image_cost_day where dt<'{dt}' and length (signature)>1 and signature in ({signature_list}) ) as a on a.dt=b.dt and a.signature = b.signature group by signature,owner ''' signature_info = db.dm.get_data_list(sql) signature_dict_video = {} for i in signature_info: owner, signature = i signature_dict_video[signature] = owner data_new = [] for i in data_video: i = i[:-3] + i[-2:] signature = i[18] if signature in signature_dict_video.keys(): i[-1] = signature_dict_video[signature] data_new.append(i) data.extend(data_new) # 进行数据存储 db.dm.execute(f'delete from dw_image_cost_day where dt="{dt}"') db.dm.executeMany( '''replace into dw_image_cost_day (dt,type,use_times,cost,view_count,click_count,follow_count,order_count, order_amount,title,description,book,platform,stage,channel,pitcher,image_id, preview_url,signature,is_video,width,height,size,format,video_length, video_bit_rate,video_meta_data,download_path,campaign_id,owner) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''', data) def hourly(): try: logging.info('广告数据清洗,开始') run(du.getNow()) logging.info('广告数据清洗,结束') except Exception as e: logging.error(str(e)) DingTalkUtils().send("广告数据清洗失败\n" + str(e)) def day(): logging.info('广告数据清洗,开始') for i in du.getDateLists(du.get_n_days(-10), du.get_n_days(-1)): # print(i) run(i) logging.info('广告数据清洗,结束') if __name__ == '__main__': # run('2021-05-18') logging.basicConfig( handlers=[ logging.handlers.RotatingFileHandler('.ad_hourly.log', maxBytes=10 * 1024 * 1024, backupCount=5, encoding='utf-8') , logging.StreamHandler() # 供输出使用 ], level=logging.INFO, format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s" ) # -495 # for i in du.getDateLists(du.get_n_days(-365), du.get_n_days(0)): print(i) # exit() run(i) # print(du.get_n_days(-20)) # run(du.get_n_days(0)) # print(du.get_n_days(-30)) # print(du.get_n_days(-3)) # run(du.get_n_days(-3))