123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- import logging
- from model.DataBaseUtils import MysqlUtils,CkUtils
- from model.DateUtils import DateUtils
- from model.DingTalkUtils import DingTalkUtils
- # logging.getLogger().setLevel(logging.WARNING)
- import pandas as pd
- db = MysqlUtils()
- ck = CkUtils()
- du=DateUtils()
- def run(dt):
- sql=f"""SELECT a.dt,b.type,count(*) as ct,sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
- title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,'')
- from
- ad_cost_day a
- left join ad_info b on a.ad_id=b.ad_id
- left join adcreative_info c on b.adcreative_id=c.adcreative_id
- left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
- left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
- where a.dt='{dt}' and c.is_video=0
- group by a.dt,b.type,title,description,book,platform,stage,image_id,e.channel,pitcher
-
- """
- # print(sql)
- data = db.quchen_text.get_data_list(sql)
- # print(data)
- # 图片链接拼接
- li = []
- for i in data:
- # print(i)
- li.extend(i[-1].split(','))
- #TODO:之后如果一天产生的图片过多,可能超过sql的字符限制,
- # 之后数据使用hive,来进行数据存储
- sql3 = f"select image_id,preview_url,signature,width,height from image_info where image_id in ({str(set(li))[1:-1]})"
- image_di = {}
- image_data = db.quchen_text.getData(sql3)
- for x in image_data:
- image_di[x[0]] = (x[1],x[2],x[3],x[4])
- # print(image_di)
- for i in data:
- preview_url = ''
- signature = ''
- width = ''
- height = ''
- image_id = ''
- for j in i[-1].split(','):
- if image_di.get(j):
- image_id = image_id + ',' + j
- preview_url = preview_url + ',' + image_di.get(j)[0]
- signature = signature + ',' + image_di.get(j)[1]
- width = width + ',' + str(image_di.get(j)[2])
- height = height + ',' + str(image_di.get(j)[3])
- else:
- image_id = image_id + ',' +j
- preview_url = preview_url + ',' + ' '
- signature = signature + ',' + ' '
- width = width + ',' + '0'
- height = height + ',' + '0'
- i[-1]=image_id[1:]
- i.append(preview_url[1:])
- i.append(signature[1:])
- i.append(0)
- i.append(width[1:])
- i.append(height[1:])
- # exit(0)
- sql_video = f"""SELECT a.dt,b.type,count(*),sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
- title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''),g.preview_url,g.signature,1,
- g.width,g.height
- from
- ad_cost_day a
- left join ad_info b on a.ad_id=b.ad_id
- left join adcreative_info c on b.adcreative_id=c.adcreative_id
- left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
- left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
- left join video_info g on c.image_id=g.video_id
- where a.dt='{dt}' and c.is_video=1
- group by a.dt,b.type,title,description,book,platform,stage,image_id,e.channel,pitcher """
- data_video = db.quchen_text.get_data_list(sql_video)
- data.extend(data_video)
- #进行数据存储
- db.dm.execute(f'delete from dw_image_cost_day where dt="{dt}"')
- db.dm.executeMany("replace into dw_image_cost_day values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",data)
- def hourly():
- try:
- logging.info('广告数据清洗,开始')
- run(du.getNow())
- logging.info('广告数据清洗,结束')
- except:
- DingTalkUtils().send("广告数据清洗失败")
- def day():
- logging.info('广告数据清洗,开始')
- for i in du.getDateLists(du.get_n_days(-10), du.get_n_days(-1)):
- # print(i)
- run(i)
- logging.info('广告数据清洗,结束')
- if __name__ == '__main__':
- # run('2021-05-18')
- for i in du.getDateLists('2021-05-01','2021-05-20'):
- print(i)
- run(i)
|