dw_image_cost_day.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. from logging import handlers
  2. from model.DataBaseUtils import MysqlUtils, CkUtils
  3. from model.DateUtils import DateUtils
  4. from model.DingTalkUtils import DingTalkUtils
  5. from datetime import datetime
  6. import logging
  7. # logging.getLogger().setLevel(logging.WARNING)
  8. db = MysqlUtils()
  9. ck = CkUtils()
  10. du = DateUtils()
  11. def run(dt):
  12. sql = f"""
  13. SELECT a.dt,a.type,count(*) as ct,sum(a.cost),sum(view_count),sum(click_count),
  14. sum(follow_count),sum(order_count),sum(order_amount),
  15. title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''),
  16. g.created_time,a.campaign_id
  17. from
  18. ad_cost_day a
  19. left join ad_info b on a.ad_id=b.ad_id
  20. left join adcreative_info c on b.adcreative_id=c.adcreative_id
  21. left join channel_by_account_daily e on a.account_id=e.account_id and a.dt=e.dt
  22. left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
  23. left join campaign_info g on a.campaign_id = g.campaign_id
  24. where a.dt='{dt}' and (c.is_video=0 or c.is_video is null)
  25. and a.campaign_id is not null
  26. group by a.campaign_id
  27. """
  28. # print(sql)
  29. data = db.quchen_text.get_data_list(sql)
  30. # 图片链接拼接
  31. li = []
  32. for i in data:
  33. li.extend(i[16].split(','))
  34. # TODO:之后如果一天产生的图片过多,可能超过sql的字符限制
  35. # 之后数据使用hive,来进行数据存储
  36. sql3 = f"select image_id,preview_url,signature,width,height,size,`type` from image_info where image_id in ({str(set(li))[1:-1]})"
  37. image_di = {}
  38. image_data = db.quchen_text.getData(sql3)
  39. signature_dict = {} # key signature_id v:(pitcher,created_time)
  40. for x in image_data:
  41. image_di[x[0]] = (x[1], x[2], x[3], x[4], x[5], x[6])
  42. for i in data:
  43. signature_tmp = ''
  44. for j in i[16].split(','):
  45. signature_tmp = signature_tmp + ',' + (image_di.get(j)[1] if image_di.get(j) else ' ')
  46. signature_tmp = signature_tmp[1:]
  47. if signature_tmp not in signature_dict.keys():
  48. signature_dict[signature_tmp] = (i[15], i[17])
  49. else:
  50. sig_created_time = signature_dict[signature_tmp][1]
  51. if sig_created_time is None:
  52. signature_dict[signature_tmp] = (i[15], i[17])
  53. elif i[17] is not None and i[17] < sig_created_time:
  54. signature_dict[signature_tmp] = (i[15], i[17])
  55. # 1.通过signature找到数据库中最老的signature对应的pitcher
  56. signature_list = "'" + "','".join([str(i) for i in signature_dict.keys()]) + "'"
  57. sql = f'''
  58. select owner ,b.signature as signature from
  59. (select min(dt) as dt,signature from dw_image_cost_day dicd
  60. where dt<'{dt}' and length (signature)>1
  61. and signature in ({signature_list})
  62. group by signature
  63. ) as b
  64. inner join(
  65. select * from
  66. dw_image_cost_day
  67. where dt<'{dt}' and length (signature)>1
  68. and signature in ({signature_list})
  69. ) as a
  70. on a.dt=b.dt and a.signature = b.signature
  71. group by signature,owner
  72. '''
  73. signature_info = db.dm.get_data_list(sql)
  74. for i in signature_info:
  75. owner, signature = i
  76. signature_dict[signature] = (owner, signature_dict[signature][1])
  77. # 2.需要对应处理如果signature为null,或者,,,,这种,设置为投手本人
  78. for i in data:
  79. preview_url = ''
  80. signature = ''
  81. width = '0'
  82. height = '0'
  83. image_id = ''
  84. size = ''
  85. type = ''
  86. video_length = None
  87. video_bit_rate = None
  88. video_meta_data = None
  89. download_path = None
  90. for j in i[16].split(','):
  91. if image_di.get(j):
  92. image_id = image_id + ',' + j
  93. preview_url = preview_url + ',' + image_di.get(j)[0]
  94. signature = signature + ',' + image_di.get(j)[1]
  95. width = str(image_di.get(j)[2])
  96. height = str(image_di.get(j)[3])
  97. size = size + ',' + str(image_di.get(j)[4])
  98. type = type + ',' + str(image_di.get(j)[5])
  99. else:
  100. image_id = image_id + ',' + j
  101. preview_url = preview_url + ',' + ' '
  102. signature = signature + ',' + ' '
  103. width = '0'
  104. height = '0'
  105. size = size + ',' + '0'
  106. type = type + ',' + ' '
  107. signature = signature[1:]
  108. pitcher = i[15]
  109. if len(signature.replace(' ', '').replace(',', '')) == 0:
  110. owner = pitcher
  111. else:
  112. owner = signature_dict[signature][0]
  113. i[16] = image_id[1:]
  114. i.append(preview_url[1:])
  115. i.append(signature)
  116. i.append(0)
  117. i.append(width)
  118. i.append(height)
  119. i.append(size[1:])
  120. i.append(type[1:])
  121. i.append(video_length)
  122. i.append(video_bit_rate)
  123. i.append(video_meta_data)
  124. i.append(download_path)
  125. i.append(i[18])
  126. i.append(owner)
  127. data_new = []
  128. for i in data:
  129. i = i[:17] + i[19:]
  130. # print(i)
  131. data_new.append(i)
  132. data = data_new
  133. sql_video = f""" select foo.*,if(foo2.pitcher,foo2.pitcher,foo.pitcher) as owner from
  134. (SELECT a.dt,a.type,count(*),sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
  135. title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''),g.preview_url,g.signature,1,
  136. g.width,g.height,g.`size` ,g.`type` as video_type ,g.video_length ,g.byte_rate ,g.video_meta_data,g.download_path
  137. ,min(h.created_time) as created_time , a.campaign_id
  138. from
  139. ad_cost_day a
  140. left join ad_info b on a.ad_id=b.ad_id
  141. left join adcreative_info c on b.adcreative_id=c.adcreative_id
  142. left join channel_by_account_daily e on a.account_id=e.account_id and a.dt=e.dt
  143. left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
  144. left join video_info g on c.image_id=g.video_id
  145. left join campaign_info h on a.campaign_id = h.campaign_id
  146. where a.dt='{dt}' and c.is_video=1 and a.campaign_id is not null
  147. group by a.campaign_id) as foo
  148. left join
  149. (select signature,pitcher from ad_cost_day a
  150. left join ad_info b on a.ad_id=b.ad_id
  151. left join adcreative_info c on b.adcreative_id=c.adcreative_id
  152. left join channel_by_account_daily e on a.account_id=e.account_id and a.dt=e.dt
  153. left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
  154. left join video_info g on c.image_id=g.video_id
  155. left join campaign_info h on a.campaign_id = h.campaign_id
  156. where a.dt='{dt}' and c.is_video=1 and a.campaign_id is not null
  157. and (signature,h.created_time) in
  158. (select signature,min(h.created_time) as created_time
  159. from
  160. ad_cost_day a
  161. left join ad_info b on a.ad_id=b.ad_id
  162. left join adcreative_info c on b.adcreative_id=c.adcreative_id
  163. left join channel_by_account_daily e on a.account_id=e.account_id and a.dt=e.dt
  164. left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
  165. left join video_info g on c.image_id=g.video_id
  166. left join campaign_info h on a.campaign_id = h.campaign_id
  167. where a.dt='{dt}' and c.is_video=1 and a.campaign_id is not null
  168. and length (signature)>6
  169. group by signature)
  170. group by signature ,pitcher ) as foo2
  171. on foo.signature=foo2.signature
  172. """
  173. data_video = db.quchen_text.get_data_list(sql_video)
  174. signature_list = "'" + "','".join([str(i[18]) for i in data if i[18] and len(i[18]) > 6]) + "'"
  175. sql = f'''
  176. select owner ,b.signature as signature from
  177. (select min(dt) as dt,signature from dw_image_cost_day dicd
  178. where dt<'{dt}' and length (signature)>1
  179. and signature in ({signature_list})
  180. group by signature
  181. ) as b
  182. inner join(
  183. select * from
  184. dw_image_cost_day
  185. where dt<'{dt}' and length (signature)>1
  186. and signature in ({signature_list})
  187. ) as a
  188. on a.dt=b.dt and a.signature = b.signature
  189. group by signature,owner
  190. '''
  191. signature_info = db.dm.get_data_list(sql)
  192. signature_dict_video = {}
  193. for i in signature_info:
  194. owner, signature = i
  195. signature_dict_video[signature] = owner
  196. data_new = []
  197. for i in data_video:
  198. i = i[:-3] + i[-2:]
  199. signature = i[18]
  200. if signature in signature_dict_video.keys():
  201. i[-1] = signature_dict_video[signature]
  202. data_new.append(i)
  203. data.extend(data_new)
  204. # 进行数据存储
  205. db.dm.execute(f'delete from dw_image_cost_day where dt="{dt}"')
  206. db.dm.executeMany(
  207. '''replace into dw_image_cost_day
  208. (dt,type,use_times,cost,view_count,click_count,follow_count,order_count,
  209. order_amount,title,description,book,platform,stage,channel,pitcher,image_id,
  210. preview_url,signature,is_video,width,height,size,format,video_length,
  211. video_bit_rate,video_meta_data,download_path,campaign_id,owner)
  212. values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''',
  213. data)
  214. def hourly():
  215. try:
  216. logging.info('广告数据清洗,开始')
  217. run(du.getNow())
  218. logging.info('广告数据清洗,结束')
  219. except Exception as e:
  220. logging.error(str(e))
  221. DingTalkUtils().send("广告数据清洗失败\n" + str(e))
  222. def day():
  223. logging.info('广告数据清洗,开始')
  224. for i in du.getDateLists(du.get_n_days(-10), du.get_n_days(-1)):
  225. # print(i)
  226. run(i)
  227. logging.info('广告数据清洗,结束')
  228. if __name__ == '__main__':
  229. # run('2021-05-18')
  230. logging.basicConfig(
  231. handlers=[
  232. logging.handlers.RotatingFileHandler('.ad_hourly.log',
  233. maxBytes=10 * 1024 * 1024,
  234. backupCount=5,
  235. encoding='utf-8')
  236. , logging.StreamHandler() # 供输出使用
  237. ],
  238. level=logging.INFO,
  239. format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s"
  240. )
  241. # -495
  242. #
  243. for i in du.getDateLists(du.get_n_days(-365), du.get_n_days(0)):
  244. print(i)
  245. # exit()
  246. run(i)
  247. # print(du.get_n_days(-20))
  248. # run(du.get_n_days(0))
  249. # print(du.get_n_days(-30))
  250. # print(du.get_n_days(-3))
  251. # run(du.get_n_days(-3))