123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- from logging import handlers
- from model.DataBaseUtils import MysqlUtils, CkUtils
- from model.DateUtils import DateUtils
- from model.DingTalkUtils import DingTalkUtils
- from datetime import datetime
- import logging
- # logging.getLogger().setLevel(logging.WARNING)
- db = MysqlUtils()
- ck = CkUtils()
- du = DateUtils()
- def run(dt):
- sql = f"""
- SELECT a.dt,b.type,count(*) as ct,sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
- title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''),
- g.last_modified_time,g.campaign_id
- from
- ad_cost_day a
- left join ad_info b on a.ad_id=b.ad_id
- left join adcreative_info c on b.adcreative_id=c.adcreative_id
- left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
- left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
- left join campaign_info g on b.campaign_id = g.campaign_id
- where a.dt='{dt}' and (c.is_video=0 or c.is_video is null) and g.campaign_id is not null
- group by g.campaign_id
-
- """
- # print(sql)
- data = db.quchen_text.get_data_list(sql)
- # print(data)
- # 图片链接拼接
- li = []
- for i in data:
- # print(i)
- li.extend(i[16].split(','))
- # TODO:之后如果一天产生的图片过多,可能超过sql的字符限制
- # TODO:归属人数据有问题
- # 之后数据使用hive,来进行数据存储
- sql3 = f"select image_id,preview_url,signature,width,height,size,`type` from image_info where image_id in ({str(set(li))[1:-1]})"
- image_di = {}
- image_data = db.quchen_text.getData(sql3)
- signature_dict = {} # key signature_id v:(pitcher,last_modified_time)
- for x in image_data:
- image_di[x[0]] = (x[1], x[2], x[3], x[4], x[5], x[6])
- for i in data:
- signature_tmp = ''
- for j in i[16].split(','):
- signature_tmp = signature_tmp + ',' + (image_di.get(j)[1] if image_di.get(j) else ' ')
- signature_tmp = signature_tmp[1:]
- if signature_tmp not in signature_dict.keys():
- signature_dict[signature_tmp] = (i[15], i[17])
- else:
- sig_last_modified_time = signature_dict[signature_tmp][1]
- if sig_last_modified_time is None:
- signature_dict[signature_tmp] = (i[15], i[17])
- elif i[17] is not None and i[17] < sig_last_modified_time:
- signature_dict[signature_tmp] = (i[15], i[17])
- # print(image_di)
- for i in data:
- preview_url = ''
- signature = ''
- width = '0'
- height = '0'
- image_id = ''
- size = ''
- type = ''
- video_length = None
- video_bit_rate = None
- video_meta_data = None
- download_path = None
- for j in i[16].split(','):
- if image_di.get(j):
- image_id = image_id + ',' + j
- preview_url = preview_url + ',' + image_di.get(j)[0]
- signature = signature + ',' + image_di.get(j)[1]
- width = str(image_di.get(j)[2])
- height = str(image_di.get(j)[3])
- size = size + ',' + str(image_di.get(j)[4])
- type = type + ',' + str(image_di.get(j)[5])
- else:
- image_id = image_id + ',' + j
- preview_url = preview_url + ',' + ' '
- signature = signature + ',' + ' '
- width = '0'
- height = '0'
- size = size + ',' + '0'
- type = type + ',' + ' '
- signature = signature[1:]
- owner = signature_dict[signature][0]
- i[16] = image_id[1:]
- i.append(preview_url[1:])
- i.append(signature)
- i.append(0)
- i.append(width)
- i.append(height)
- i.append(size[1:])
- i.append(type[1:])
- i.append(video_length)
- i.append(video_bit_rate)
- i.append(video_meta_data)
- i.append(download_path)
- i.append(i[18])
- i.append(owner)
- data_new = []
- for i in data:
- i = i[:17] + i[19:]
- # print(i)
- data_new.append(i)
- data = data_new
- # exit(0)
- sql_video = f"""
- select foo.*,foo2.pitcher as owner from
- (SELECT a.dt,b.type,count(*),sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
- title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''),g.preview_url,g.signature,1,
- g.width,g.height,g.`size` ,g.`type` as video_type ,g.video_length ,g.byte_rate ,g.video_meta_data,g.download_path
- ,min(h.last_modified_time) as last_modified_time , h.campaign_id
- from
- ad_cost_day a
- left join ad_info b on a.ad_id=b.ad_id
- left join adcreative_info c on b.adcreative_id=c.adcreative_id
- left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
- left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
- left join video_info g on c.image_id=g.video_id
- left join campaign_info h on b.campaign_id = h.campaign_id
- where a.dt='{dt}' and c.is_video=1 and h.campaign_id is not null
- group by h.campaign_id) as foo
- inner join
- (select pitcher,min(h.last_modified_time) as last_modified_time
- from
- ad_cost_day a
- left join ad_info b on a.ad_id=b.ad_id
- left join adcreative_info c on b.adcreative_id=c.adcreative_id
- left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
- left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
- left join video_info g on c.image_id=g.video_id
- left join campaign_info h on b.campaign_id = h.campaign_id
- where a.dt='{dt}' and c.is_video=1 and h.campaign_id is not null
- group by pitcher,h.last_modified_time ) as foo2
- on foo.pitcher=foo2.pitcher and foo.last_modified_time=foo2.last_modified_time
- """
- data_video = db.quchen_text.get_data_list(sql_video)
- data_new = []
- for i in data_video:
- i = i[:-3] + i[-2:]
- data_new.append(i)
- data.extend(data_new)
- # 进行数据存储
- db.dm.execute(f'delete from dw_image_cost_day where dt="{dt}"')
- db.dm.executeMany(
- '''replace into dw_image_cost_day
- (dt,type,use_times,cost,view_count,click_count,follow_count,order_count,
- order_amount,title,description,book,platform,stage,channel,pitcher,image_id,
- preview_url,signature,is_video,width,height,size,format,video_length,
- video_bit_rate,video_meta_data,download_path,campaign_id,owner)
- values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''',
- data)
- def hourly():
- try:
- logging.info('广告数据清洗,开始')
- run(du.getNow())
- logging.info('广告数据清洗,结束')
- except Exception as e:
- logging.error(str(e))
- DingTalkUtils().send("广告数据清洗失败\n" + str(e))
- def day():
- logging.info('广告数据清洗,开始')
- for i in du.getDateLists(du.get_n_days(-10), du.get_n_days(-1)):
- # print(i)
- run(i)
- logging.info('广告数据清洗,结束')
- if __name__ == '__main__':
- # run('2021-05-18')
- logging.basicConfig(
- handlers=[
- logging.handlers.RotatingFileHandler('.ad_hourly.log',
- maxBytes=10 * 1024 * 1024,
- backupCount=5,
- encoding='utf-8')
- , logging.StreamHandler() # 供输出使用
- ],
- level=logging.INFO,
- format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s"
- )
- # -495
- #
- # for i in du.getDateLists(du.get_n_days(-495), du.get_n_days(0)):
- # print(i)
- # # exit()
- # run(i)
- # print(du.get_n_days(-20))
- run(du.get_n_days(0))
|