import logging from model.DataBaseUtils import MysqlUtils, CkUtils from model.DateUtils import DateUtils from model.DingTalkUtils import DingTalkUtils # logging.getLogger().setLevel(logging.WARNING) import pandas as pd db = MysqlUtils() ck = CkUtils() du = DateUtils() def run(dt): sql = f"""SELECT a.dt,b.type,count(*) as ct,sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount), title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,'') from ad_cost_day a left join ad_info b on a.ad_id=b.ad_id left join adcreative_info c on b.adcreative_id=c.adcreative_id left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt where a.dt='{dt}' and c.is_video=0 group by a.dt,b.type,title,description,book,platform,stage,image_id,e.channel,pitcher """ # print(sql) data = db.quchen_text.get_data_list(sql) # print(data) # 图片链接拼接 li = [] for i in data: # print(i) li.extend(i[-1].split(',')) # TODO:之后如果一天产生的图片过多,可能超过sql的字符限制, # 之后数据使用hive,来进行数据存储 sql3 = f"select image_id,preview_url,signature,width,height,size,`type` from image_info where image_id in ({str(set(li))[1:-1]})" image_di = {} image_data = db.quchen_text.getData(sql3) for x in image_data: image_di[x[0]] = (x[1], x[2], x[3], x[4], x[5], x[6]) # print(image_di) for i in data: preview_url = '' signature = '' width = '0' height = '0' image_id = '' size = '' type = '' video_length = None video_bit_rate = None for j in i[-1].split(','): if image_di.get(j): image_id = image_id + ',' + j preview_url = preview_url + ',' + image_di.get(j)[0] signature = signature + ',' + image_di.get(j)[1] width = str(image_di.get(j)[2]) height = str(image_di.get(j)[3]) size = size + ',' + str(image_di.get(j)[4]) type = type + ',' + str(image_di.get(j)[5]) else: image_id = image_id + ',' + j preview_url = preview_url + ',' + ' ' signature = signature + ',' + ' ' width = '0' height = '0' size = size + ',' + '0' type = type + ',' + ' ' i[-1] = image_id[1:] i.append(preview_url[1:]) i.append(signature[1:]) i.append(0) i.append(width[1:]) i.append(height[1:]) i.append(size[1:]) i.append(type[1:]) i.append(video_length) i.append(video_bit_rate) # exit(0) sql_video = f"""SELECT a.dt,b.type,count(*),sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount), title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''),g.preview_url,g.signature,1, g.width,g.height,g.`size` ,g.`type` ,g.video_length ,g.byte_rate from ad_cost_day a left join ad_info b on a.ad_id=b.ad_id left join adcreative_info c on b.adcreative_id=c.adcreative_id left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt left join video_info g on c.image_id=g.video_id where a.dt='{dt}' and c.is_video=1 group by a.dt,b.type,title,description,book,platform,stage,image_id,e.channel,pitcher """ data_video = db.quchen_text.get_data_list(sql_video) data.extend(data_video) # 进行数据存储 db.dm.execute(f'delete from dw_image_cost_day where dt="{dt}"') db.dm.executeMany( "replace into dw_image_cost_day values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", data) def hourly(): try: logging.info('广告数据清洗,开始') run(du.getNow()) logging.info('广告数据清洗,结束') except: DingTalkUtils().send("广告数据清洗失败") def day(): logging.info('广告数据清洗,开始') for i in du.getDateLists(du.get_n_days(-10), du.get_n_days(-1)): # print(i) run(i) logging.info('广告数据清洗,结束') if __name__ == '__main__': # run('2021-05-18') for i in du.getDateLists(du.get_n_days(-3), du.get_n_days(0)): print(i) run(i)