dw_image_cost_day.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. from logging import handlers
  2. from model.DataBaseUtils import MysqlUtils, CkUtils
  3. from model.DateUtils import DateUtils
  4. from model.DingTalkUtils import DingTalkUtils
  5. from datetime import datetime
  6. import logging
  7. # logging.getLogger().setLevel(logging.WARNING)
  8. db = MysqlUtils()
  9. ck = CkUtils()
  10. du = DateUtils()
  11. def run(dt):
  12. sql = f"""
  13. SELECT a.dt,b.type,count(*) as ct,sum(a.cost),sum(view_count),sum(click_count),
  14. sum(follow_count),sum(order_count),sum(order_amount),
  15. title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''),
  16. g.created_time,g.campaign_id
  17. from
  18. ad_cost_day a
  19. left join ad_info b on a.ad_id=b.ad_id
  20. left join adcreative_info c on b.adcreative_id=c.adcreative_id
  21. left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
  22. left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
  23. left join campaign_info g on b.campaign_id = g.campaign_id
  24. where a.dt='{dt}' and (c.is_video=0 or c.is_video is null) and g.campaign_id is not null
  25. group by g.campaign_id
  26. """
  27. # print(sql)
  28. data = db.quchen_text.get_data_list(sql)
  29. # print(data)
  30. # 图片链接拼接
  31. li = []
  32. for i in data:
  33. # print(i)
  34. li.extend(i[16].split(','))
  35. # TODO:之后如果一天产生的图片过多,可能超过sql的字符限制
  36. # 之后数据使用hive,来进行数据存储
  37. sql3 = f"select image_id,preview_url,signature,width,height,size,`type` from image_info where image_id in ({str(set(li))[1:-1]})"
  38. image_di = {}
  39. image_data = db.quchen_text.getData(sql3)
  40. signature_dict = {} # key signature_id v:(pitcher,created_time)
  41. for x in image_data:
  42. image_di[x[0]] = (x[1], x[2], x[3], x[4], x[5], x[6])
  43. for i in data:
  44. signature_tmp = ''
  45. for j in i[16].split(','):
  46. signature_tmp = signature_tmp + ',' + (image_di.get(j)[1] if image_di.get(j) else ' ')
  47. signature_tmp = signature_tmp[1:]
  48. if signature_tmp not in signature_dict.keys():
  49. signature_dict[signature_tmp] = (i[15], i[17])
  50. else:
  51. sig_created_time = signature_dict[signature_tmp][1]
  52. if sig_created_time is None:
  53. signature_dict[signature_tmp] = (i[15], i[17])
  54. elif i[17] is not None and i[17] < sig_created_time:
  55. signature_dict[signature_tmp] = (i[15], i[17])
  56. # 1.通过signature找到数据库中最老的signature对应的pitcher
  57. signature_list = "'" + "','".join([str(i) for i in signature_dict.keys()]) + "'"
  58. sql = f'''
  59. select owner ,b.signature as signature from
  60. (select min(dt) as dt,signature from dw_image_cost_day dicd
  61. where dt<'{dt}' and length (signature)>1
  62. and signature in ({signature_list})
  63. group by signature
  64. ) as b
  65. inner join(
  66. select * from
  67. dw_image_cost_day
  68. where dt<'{dt}' and length (signature)>1
  69. and signature in ({signature_list})
  70. ) as a
  71. on a.dt=b.dt and a.signature = b.signature
  72. group by signature,owner
  73. '''
  74. signature_info = db.dm.get_data_list(sql)
  75. for i in signature_info:
  76. owner, signature = i
  77. signature_dict[signature] = (owner, signature_dict[signature][1])
  78. # 2.需要对应处理如果signature为null,或者,,,,这种,设置为投手本人
  79. for i in data:
  80. preview_url = ''
  81. signature = ''
  82. width = '0'
  83. height = '0'
  84. image_id = ''
  85. size = ''
  86. type = ''
  87. video_length = None
  88. video_bit_rate = None
  89. video_meta_data = None
  90. download_path = None
  91. for j in i[16].split(','):
  92. if image_di.get(j):
  93. image_id = image_id + ',' + j
  94. preview_url = preview_url + ',' + image_di.get(j)[0]
  95. signature = signature + ',' + image_di.get(j)[1]
  96. width = str(image_di.get(j)[2])
  97. height = str(image_di.get(j)[3])
  98. size = size + ',' + str(image_di.get(j)[4])
  99. type = type + ',' + str(image_di.get(j)[5])
  100. else:
  101. image_id = image_id + ',' + j
  102. preview_url = preview_url + ',' + ' '
  103. signature = signature + ',' + ' '
  104. width = '0'
  105. height = '0'
  106. size = size + ',' + '0'
  107. type = type + ',' + ' '
  108. signature = signature[1:]
  109. pitcher = i[15]
  110. if len(signature.replace(' ', '').replace(',', '')) == 0:
  111. owner = pitcher
  112. else:
  113. owner = signature_dict[signature][0]
  114. i[16] = image_id[1:]
  115. i.append(preview_url[1:])
  116. i.append(signature)
  117. i.append(0)
  118. i.append(width)
  119. i.append(height)
  120. i.append(size[1:])
  121. i.append(type[1:])
  122. i.append(video_length)
  123. i.append(video_bit_rate)
  124. i.append(video_meta_data)
  125. i.append(download_path)
  126. i.append(i[18])
  127. i.append(owner)
  128. data_new = []
  129. for i in data:
  130. i = i[:17] + i[19:]
  131. # print(i)
  132. data_new.append(i)
  133. data = data_new
  134. sql_video = f""" select foo.*,if(foo2.pitcher,foo2.pitcher,foo.pitcher) as owner from
  135. (SELECT a.dt,b.type,count(*),sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
  136. title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''),g.preview_url,g.signature,1,
  137. g.width,g.height,g.`size` ,g.`type` as video_type ,g.video_length ,g.byte_rate ,g.video_meta_data,g.download_path
  138. ,min(h.created_time) as created_time , h.campaign_id
  139. from
  140. ad_cost_day a
  141. left join ad_info b on a.ad_id=b.ad_id
  142. left join adcreative_info c on b.adcreative_id=c.adcreative_id
  143. left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
  144. left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
  145. left join video_info g on c.image_id=g.video_id
  146. left join campaign_info h on b.campaign_id = h.campaign_id
  147. where a.dt='{dt}' and c.is_video=1 and h.campaign_id is not null
  148. group by h.campaign_id) as foo
  149. inner join
  150. (select signature,pitcher from ad_cost_day a
  151. left join ad_info b on a.ad_id=b.ad_id
  152. left join adcreative_info c on b.adcreative_id=c.adcreative_id
  153. left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
  154. left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
  155. left join video_info g on c.image_id=g.video_id
  156. left join campaign_info h on b.campaign_id = h.campaign_id
  157. where a.dt='{dt}' and c.is_video=1 and h.campaign_id is not null
  158. and (signature,h.created_time) in
  159. (select signature,min(h.created_time) as created_time
  160. from
  161. ad_cost_day a
  162. left join ad_info b on a.ad_id=b.ad_id
  163. left join adcreative_info c on b.adcreative_id=c.adcreative_id
  164. left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
  165. left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
  166. left join video_info g on c.image_id=g.video_id
  167. left join campaign_info h on b.campaign_id = h.campaign_id
  168. where a.dt='{dt}' and c.is_video=1 and h.campaign_id is not null
  169. and length (signature)>6
  170. group by signature)
  171. group by signature ,pitcher ) as foo2
  172. on foo.signature=foo2.signature
  173. """
  174. data_video = db.quchen_text.get_data_list(sql_video)
  175. signature_list = "'" + "','".join([str(i[18]) for i in data if i[18] and len(i[18]) > 6]) + "'"
  176. sql = f'''
  177. select owner ,b.signature as signature from
  178. (select min(dt) as dt,signature from dw_image_cost_day dicd
  179. where dt<'{dt}' and length (signature)>1
  180. and signature in ({signature_list})
  181. group by signature
  182. ) as b
  183. inner join(
  184. select * from
  185. dw_image_cost_day
  186. where dt<'{dt}' and length (signature)>1
  187. and signature in ({signature_list})
  188. ) as a
  189. on a.dt=b.dt and a.signature = b.signature
  190. group by signature,owner
  191. '''
  192. signature_info = db.dm.get_data_list(sql)
  193. signature_dict_video = {}
  194. for i in signature_info:
  195. owner, signature = i
  196. signature_dict_video[signature] = owner
  197. data_new = []
  198. for i in data_video:
  199. i = i[:-3] + i[-2:]
  200. signature = i[18]
  201. if signature in signature_dict_video.keys():
  202. i[-1] = signature_dict_video[signature]
  203. data_new.append(i)
  204. data.extend(data_new)
  205. # 进行数据存储
  206. db.dm.execute(f'delete from dw_image_cost_day where dt="{dt}"')
  207. db.dm.executeMany(
  208. '''replace into dw_image_cost_day
  209. (dt,type,use_times,cost,view_count,click_count,follow_count,order_count,
  210. order_amount,title,description,book,platform,stage,channel,pitcher,image_id,
  211. preview_url,signature,is_video,width,height,size,format,video_length,
  212. video_bit_rate,video_meta_data,download_path,campaign_id,owner)
  213. values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''',
  214. data)
  215. def hourly():
  216. try:
  217. logging.info('广告数据清洗,开始')
  218. run(du.getNow())
  219. logging.info('广告数据清洗,结束')
  220. except Exception as e:
  221. logging.error(str(e))
  222. DingTalkUtils().send("广告数据清洗失败\n" + str(e))
  223. def day():
  224. logging.info('广告数据清洗,开始')
  225. for i in du.getDateLists(du.get_n_days(-10), du.get_n_days(-1)):
  226. # print(i)
  227. run(i)
  228. logging.info('广告数据清洗,结束')
  229. if __name__ == '__main__':
  230. # run('2021-05-18')
  231. logging.basicConfig(
  232. handlers=[
  233. logging.handlers.RotatingFileHandler('.ad_hourly.log',
  234. maxBytes=10 * 1024 * 1024,
  235. backupCount=5,
  236. encoding='utf-8')
  237. , logging.StreamHandler() # 供输出使用
  238. ],
  239. level=logging.INFO,
  240. format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s"
  241. )
  242. # -495
  243. #
  244. for i in du.getDateLists(du.get_n_days(-495), du.get_n_days(0)):
  245. print(i)
  246. # exit()
  247. run(i)
  248. # print(du.get_n_days(-20))
  249. # run(du.get_n_days(0))
  250. # print(du.get_n_days(-30))
  251. # run(du.get_n_days(-30))