dw_image_cost_day.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. from logging import handlers
  2. from model.DataBaseUtils import MysqlUtils, CkUtils
  3. from model.DateUtils import DateUtils
  4. from model.DingTalkUtils import DingTalkUtils
  5. from datetime import datetime
  6. import logging
  7. # logging.getLogger().setLevel(logging.WARNING)
  8. db = MysqlUtils()
  9. ck = CkUtils()
  10. du = DateUtils()
  11. def run(dt):
  12. sql = f"""
  13. SELECT a.dt,b.type,count(*) as ct,sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
  14. title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''),
  15. g.last_modified_time,g.campaign_id
  16. from
  17. ad_cost_day a
  18. left join ad_info b on a.ad_id=b.ad_id
  19. left join adcreative_info c on b.adcreative_id=c.adcreative_id
  20. left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
  21. left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
  22. left join campaign_info g on b.campaign_id = g.campaign_id
  23. where a.dt='{dt}' and c.is_video=0 and g.campaign_id is not null
  24. group by a.dt,b.type,title,description,book,platform,stage,image_id,e.channel,pitcher
  25. """
  26. # print(sql)
  27. data = db.quchen_text.get_data_list(sql)
  28. # print(data)
  29. # 图片链接拼接
  30. li = []
  31. for i in data:
  32. # print(i)
  33. li.extend(i[16].split(','))
  34. # TODO:之后如果一天产生的图片过多,可能超过sql的字符限制
  35. # 之后数据使用hive,来进行数据存储
  36. sql3 = f"select image_id,preview_url,signature,width,height,size,`type` from image_info where image_id in ({str(set(li))[1:-1]})"
  37. image_di = {}
  38. image_data = db.quchen_text.getData(sql3)
  39. signature_dict = {} # key signature_id v:(pitcher,last_modified_time)
  40. for x in image_data:
  41. image_di[x[0]] = (x[1], x[2], x[3], x[4], x[5], x[6])
  42. for i in data:
  43. signature_tmp = ''
  44. for j in i[16].split(','):
  45. signature_tmp = signature_tmp + ',' + (image_di.get(j)[1] if image_di.get(j) else ' ')
  46. signature_tmp = signature_tmp[1:]
  47. if signature_tmp not in signature_dict.keys():
  48. signature_dict[signature_tmp] = (i[15], i[17], i[18])
  49. else:
  50. sig_last_modified_time = signature_dict[signature_tmp][1]
  51. if sig_last_modified_time is None:
  52. signature_dict[signature_tmp] = (i[15], i[17], i[18])
  53. elif i[17] is not None and i[17] < sig_last_modified_time:
  54. signature_dict[signature_tmp] = (i[15], i[17], i[18])
  55. # print(image_di)
  56. for i in data:
  57. preview_url = ''
  58. signature = ''
  59. width = '0'
  60. height = '0'
  61. image_id = ''
  62. size = ''
  63. type = ''
  64. video_length = None
  65. video_bit_rate = None
  66. video_meta_data = None
  67. download_path = None
  68. for j in i[16].split(','):
  69. if image_di.get(j):
  70. image_id = image_id + ',' + j
  71. preview_url = preview_url + ',' + image_di.get(j)[0]
  72. signature = signature + ',' + image_di.get(j)[1]
  73. width = str(image_di.get(j)[2])
  74. height = str(image_di.get(j)[3])
  75. size = size + ',' + str(image_di.get(j)[4])
  76. type = type + ',' + str(image_di.get(j)[5])
  77. else:
  78. image_id = image_id + ',' + j
  79. preview_url = preview_url + ',' + ' '
  80. signature = signature + ',' + ' '
  81. width = '0'
  82. height = '0'
  83. size = size + ',' + '0'
  84. type = type + ',' + ' '
  85. signature = signature[1:]
  86. owner = signature_dict[signature][0]
  87. campaign_id = signature_dict[signature][2]
  88. i[16] = image_id[1:]
  89. i.append(preview_url[1:])
  90. i.append(signature)
  91. i.append(0)
  92. i.append(width)
  93. i.append(height)
  94. i.append(size[1:])
  95. i.append(type[1:])
  96. i.append(video_length)
  97. i.append(video_bit_rate)
  98. i.append(video_meta_data)
  99. i.append(download_path)
  100. i.append(campaign_id)
  101. i.append(owner)
  102. data_new = []
  103. for i in data:
  104. i = i[:17] + i[19:]
  105. data_new.append(i)
  106. data = data_new
  107. # exit(0)
  108. sql_video = f"""
  109. select foo.*,foo2.pitcher as owner from
  110. (SELECT a.dt,b.type,count(*),sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
  111. title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''),g.preview_url,g.signature,1,
  112. g.width,g.height,g.`size` ,g.`type` as video_type ,g.video_length ,g.byte_rate ,g.video_meta_data,g.download_path
  113. ,min(h.last_modified_time) as last_modified_time , h.campaign_id
  114. from
  115. ad_cost_day a
  116. left join ad_info b on a.ad_id=b.ad_id
  117. left join adcreative_info c on b.adcreative_id=c.adcreative_id
  118. left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
  119. left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
  120. left join video_info g on c.image_id=g.video_id
  121. left join campaign_info h on b.campaign_id = h.campaign_id
  122. where a.dt='{dt}' and c.is_video=1 and h.campaign_id is not null
  123. group by a.dt,b.type,title,description,
  124. book,platform,stage,image_id,e.channel,pitcher) as foo
  125. inner join
  126. (select pitcher,min(h.last_modified_time) as last_modified_time
  127. from
  128. ad_cost_day a
  129. left join ad_info b on a.ad_id=b.ad_id
  130. left join adcreative_info c on b.adcreative_id=c.adcreative_id
  131. left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
  132. left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
  133. left join video_info g on c.image_id=g.video_id
  134. left join campaign_info h on b.campaign_id = h.campaign_id
  135. where a.dt='{dt}' and c.is_video=1 and h.campaign_id is not null
  136. group by pitcher,h.last_modified_time ) as foo2
  137. on foo.pitcher=foo2.pitcher and foo.last_modified_time=foo2.last_modified_time
  138. """
  139. data_video = db.quchen_text.get_data_list(sql_video)
  140. data_new = []
  141. for i in data_video:
  142. i = i[:-3] + i[-2:]
  143. data_new.append(i)
  144. data.extend(data_new)
  145. # 进行数据存储
  146. db.dm.execute(f'delete from dw_image_cost_day where dt="{dt}"')
  147. db.dm.executeMany(
  148. '''replace into dw_image_cost_day
  149. (dt,type,use_times,cost,view_count,click_count,follow_count,order_count,
  150. order_amount,title,description,book,platform,stage,channel,pitcher,image_id,
  151. preview_url,signature,is_video,width,height,size,format,video_length,
  152. video_bit_rate,video_meta_data,download_path,campaign_id,owner)
  153. values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''',
  154. data)
  155. # # 图片数据归属权改变
  156. # # 大于五天的数据进行,消耗超过5000的owner置位null
  157. # owner_sql = '''
  158. # UPDATE dw_image_cost_day
  159. # set owner = null
  160. # WHERE
  161. # dt<date_add(now(),interval -5 day) or cost>5000
  162. # '''
  163. # db.dm.execute(owner_sql)
  164. # ck对应数据也保存一份
  165. # 1.进行当天相关的分区删除
  166. ck.client.execute(f''' alter table dw_image_cost_day drop partition '{dt}' ''')
  167. col = ['dt', 'type', 'use_times', 'cost', 'view_count', 'click_count',
  168. 'follow_count', 'order_count', 'order_amount', 'title', 'description',
  169. 'book', 'platform', 'stage', 'channel', 'pitcher', 'image_id', 'preview_url',
  170. 'signature', 'is_video', 'width', 'height', 'size', 'format', 'video_length',
  171. 'video_bit_rate', 'video_meta_data', 'download_path','campaign_id', 'owner']
  172. # ck存入前进行数据格式化
  173. for _ in data:
  174. # data= [col if col else 'null' for col in data ]
  175. _[0] = datetime.strptime(_[0], '%Y-%m-%d')
  176. _[1] = str(_[1]) if _[1] is not None else None
  177. _[2] = int(_[2]) if _[2] is not None else None
  178. _[3] = float(_[3]) if _[3] is not None else None
  179. _[4] = int(_[4]) if _[4] is not None else None
  180. _[5] = int(_[5]) if _[5] is not None else None
  181. _[6] = int(_[6]) if _[6] is not None else None
  182. _[7] = int(_[7]) if _[7] is not None else None
  183. _[8] = float(_[8]) if _[8] is not None else None
  184. _[19] = str(_[19])
  185. _[20] = str(_[20]) if _[20] is not None else None
  186. _[21] = str(_[21]) if _[21] is not None else None
  187. _[22] = str(_[22]) if _[22] is not None else None
  188. _[23] = str(_[23]) if _[23] is not None else None
  189. col_str = ','.join(col)
  190. logging.info('ck填充数据进入')
  191. ck.client.execute('SET max_partitions_per_insert_block=1000;')
  192. ck.client.execute('insert into dw_image_cost_day ({}) values'.format(col_str), data)
  193. logging.info('ck填充数据,结束')
  194. def hourly():
  195. try:
  196. logging.info('广告数据清洗,开始')
  197. run(du.getNow())
  198. logging.info('广告数据清洗,结束')
  199. except Exception as e:
  200. logging.error(str(e))
  201. DingTalkUtils().send("广告数据清洗失败\n" + str(e))
  202. def day():
  203. logging.info('广告数据清洗,开始')
  204. for i in du.getDateLists(du.get_n_days(-10), du.get_n_days(-1)):
  205. # print(i)
  206. run(i)
  207. logging.info('广告数据清洗,结束')
  208. if __name__ == '__main__':
  209. # run('2021-05-18')
  210. logging.basicConfig(
  211. handlers=[
  212. logging.handlers.RotatingFileHandler('.ad_hourly.log',
  213. maxBytes=10 * 1024 * 1024,
  214. backupCount=5,
  215. encoding='utf-8')
  216. , logging.StreamHandler() # 供输出使用
  217. ],
  218. level=logging.INFO,
  219. format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s"
  220. )
  221. # -495
  222. #
  223. # for i in du.getDateLists(du.get_n_days(-495), du.get_n_days(0)):
  224. # print(i)
  225. # run(i)
  226. # print(du.get_n_days(-20))
  227. run(du.get_n_days(0))