import logging from model.DataBaseUtils import MysqlUtils,CkUtils from model.DateUtils import DateUtils from model.DingTalkUtils import DingTalkUtils # logging.getLogger().setLevel(logging.WARNING) import pandas as pd db = MysqlUtils() ck = CkUtils() du=DateUtils() def run(dt): sql=f"""SELECT a.dt,b.type,sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount), title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,'') from ad_cost_day a left join ad_info b on a.ad_id=b.ad_id left join adcreative_info c on b.adcreative_id=c.adcreative_id left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt where a.dt='{dt}' and c.is_video=0 group by a.dt,b.type,title,description,book,platform,stage,image_id,e.channel,pitcher """ # print(sql) data = db.quchen_text.get_data_list(sql) # print(data) # 图片链接拼接 li = [] for i in data: # print(i) li.extend(i[-1].split(',')) # print(li) #TODO:之后如果一天产生的图片过多,可能超过sql的字符限制 sql3 = f"select image_id,preview_url,signature from image_info where image_id in ({str(set(li))[1:-1]})" image_di = {} image_data = db.quchen_text.getData(sql3) for x in image_data: image_di[x[0]] = (x[1],x[2]) # print(image_di) for i in data: y = '' z = '' for j in i[-1].split(','): if image_di.get(j): y = y + ',' + image_di.get(j)[0] z = z + ',' + image_di.get(j)[1] i.append(y[1:]) i.append(z[1:]) i.append(0) # print(data) # print(data) # exit(0) sql_video = f"""SELECT a.dt,b.type,sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount), title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''),g.preview_url,g.signature,1 from ad_cost_day a left join ad_info b on a.ad_id=b.ad_id left join adcreative_info c on b.adcreative_id=c.adcreative_id left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt left join video_info g on c.image_id=g.video_id where a.dt='{dt}' and c.is_video=1 group by a.dt,b.type,title,description,book,platform,stage,image_id,e.channel,pitcher """ data_video = db.quchen_text.get_data_list(sql_video) data.extend(data_video) # print(data) db.dm.execute(f'delete from dw_image_cost_day where dt="{dt}"') db.dm.executeMany("replace into dw_image_cost_day values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",data) def hourly(): try: logging.info('广告数据清洗,开始') run(du.getNow()) logging.info('广告数据清洗,结束') except: DingTalkUtils.send("广告数据清洗失败") def day(): logging.info('广告数据清洗,开始') for i in du.getDateLists(du.get_n_days(-10), du.get_n_days(-1)): # print(i) run(i) logging.info('广告数据清洗,结束') if __name__ == '__main__': # run('2021-05-18') for i in du.getDateLists('2021-05-01','2021-05-20'): print(i) run(i)