from logging import handlers from model.DataBaseUtils import MysqlUtils, CkUtils from model.DateUtils import DateUtils from model.DingTalkUtils import DingTalkUtils from datetime import datetime import logging # logging.getLogger().setLevel(logging.WARNING) db = MysqlUtils() ck = CkUtils() du = DateUtils() def run(dt): sql = f"""SELECT a.dt,b.type,count(*) as ct,sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount), title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,'') from ad_cost_day a left join ad_info b on a.ad_id=b.ad_id left join adcreative_info c on b.adcreative_id=c.adcreative_id left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt where a.dt='{dt}' and c.is_video=0 group by a.dt,b.type,title,description,book,platform,stage,image_id,e.channel,pitcher """ # print(sql) data = db.quchen_text.get_data_list(sql) # print(data) # 图片链接拼接 li = [] for i in data: # print(i) li.extend(i[-1].split(',')) # TODO:之后如果一天产生的图片过多,可能超过sql的字符限制, # 之后数据使用hive,来进行数据存储 sql3 = f"select image_id,preview_url,signature,width,height,size,`type` from image_info where image_id in ({str(set(li))[1:-1]})" image_di = {} image_data = db.quchen_text.getData(sql3) for x in image_data: image_di[x[0]] = (x[1], x[2], x[3], x[4], x[5], x[6]) # print(image_di) for i in data: preview_url = '' signature = '' width = '0' height = '0' image_id = '' size = '' type = '' video_length = None video_bit_rate = None video_meta_data = None download_path = None for j in i[-1].split(','): if image_di.get(j): image_id = image_id + ',' + j preview_url = preview_url + ',' + image_di.get(j)[0] signature = signature + ',' + image_di.get(j)[1] width = str(image_di.get(j)[2]) height = str(image_di.get(j)[3]) size = size + ',' + str(image_di.get(j)[4]) type = type + ',' + str(image_di.get(j)[5]) else: image_id = image_id + ',' + j preview_url = preview_url + ',' + ' ' signature = signature + ',' + ' ' width = '0' height = '0' size = size + ',' + '0' type = type + ',' + ' ' i[-1] = image_id[1:] i.append(preview_url[1:]) i.append(signature[1:]) i.append(0) i.append(width) i.append(height) i.append(size[1:]) i.append(type[1:]) i.append(video_length) i.append(video_bit_rate) i.append(video_meta_data) i.append(download_path) # exit(0) sql_video = f"""SELECT a.dt,b.type,count(*),sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount), title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''),g.preview_url,g.signature,1, g.width,g.height,g.`size` ,g.`type` ,g.video_length ,g.byte_rate ,g.video_meta_data,g.download_path from ad_cost_day a left join ad_info b on a.ad_id=b.ad_id left join adcreative_info c on b.adcreative_id=c.adcreative_id left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt left join video_info g on c.image_id=g.video_id where a.dt='{dt}' and c.is_video=1 group by a.dt,b.type,title,description,book,platform,stage,image_id,e.channel,pitcher """ data_video = db.quchen_text.get_data_list(sql_video) data.extend(data_video) # 进行数据存储 db.dm.execute(f'delete from dw_image_cost_day where dt="{dt}"') db.dm.executeMany( '''replace into dw_image_cost_day (dt,type,use_times,cost,view_count,click_count,follow_count,order_count, order_amount,title,description,book,platform,stage,channel,pitcher,image_id, preview_url,signature,is_video,width,height,size,format,video_length, video_bit_rate,video_meta_data,download_path) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''', data) # ck对应数据也保存一份 # 1.进行当天相关的分区删除 ck.client.execute(f''' alter table dw_image_cost_day drop partition '{dt}' ''' ) col = ['dt', 'type', 'use_times', 'cost', 'view_count', 'click_count', 'follow_count', 'order_count', 'order_amount', 'title', 'description', 'book', 'platform', 'stage', 'channel', 'pitcher', 'image_id', 'preview_url', 'signature', 'is_video', 'width', 'height', 'size', 'format', 'video_length', 'video_bit_rate', 'video_meta_data', 'download_path'] # ck存入前进行数据格式化 for _ in data: # data= [col if col else 'null' for col in data ] _[0] = datetime.strptime(_[0], '%Y-%m-%d') _[1] = str(_[1]) if _[1] is not None else None _[2] = int(_[2]) if _[2] is not None else None _[3] = float(_[3]) if _[3] is not None else None _[4] = int(_[4]) if _[4] is not None else None _[5] = int(_[5]) if _[5] is not None else None _[6] = int(_[6]) if _[6] is not None else None _[7] = int(_[7]) if _[7] is not None else None _[8] = float(_[8]) if _[8] is not None else None _[19] = str(_[19]) _[20] = str(_[20]) if _[20] is not None else None _[21] = str(_[21]) if _[21] is not None else None _[22] = str(_[22]) if _[22] is not None else None _[23] = str(_[23]) if _[23] is not None else None col_str = ','.join(col) logging.info('ck填充数据进入') ck.client.execute('SET max_partitions_per_insert_block=1000;') ck.client.execute('insert into dw_image_cost_day ({}) values'.format(col_str), data) logging.info('ck填充数据,结束') def hourly(): try: logging.info('广告数据清洗,开始') run(du.getNow()) logging.info('广告数据清洗,结束') except Exception as e: logging.error(str(e)) DingTalkUtils().send("广告数据清洗失败\n" + str(e)) def day(): logging.info('广告数据清洗,开始') for i in du.getDateLists(du.get_n_days(-10), du.get_n_days(-1)): # print(i) run(i) logging.info('广告数据清洗,结束') if __name__ == '__main__': # run('2021-05-18') logging.basicConfig( handlers=[ logging.handlers.RotatingFileHandler('.ad_hourly.log', maxBytes=10 * 1024 * 1024, backupCount=5, encoding='utf-8') , logging.StreamHandler() # 供输出使用 ], level=logging.INFO, format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s" ) #-495 # for i in du.getDateLists(du.get_n_days(-495), du.get_n_days(0)): # print(i) # run(i) print(du.getNow()) run(du.get_n_days(-1))