dw_image_cost_day.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. from logging import handlers
  2. from model.DataBaseUtils import MysqlUtils, CkUtils
  3. from model.DateUtils import DateUtils
  4. from model.DingTalkUtils import DingTalkUtils
  5. from datetime import datetime
  6. import logging
  7. # logging.getLogger().setLevel(logging.WARNING)
  8. db = MysqlUtils()
  9. ck = CkUtils()
  10. du = DateUtils()
  11. def run(dt):
  12. sql = f"""
  13. SELECT a.dt,b.type,count(*) as ct,sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
  14. title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''),
  15. g.last_modified_time,g.campaign_id
  16. from
  17. ad_cost_day a
  18. left join ad_info b on a.ad_id=b.ad_id
  19. left join adcreative_info c on b.adcreative_id=c.adcreative_id
  20. left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
  21. left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
  22. left join campaign_info g on b.campaign_id = g.campaign_id
  23. where a.dt='{dt}' and c.is_video=0 and g.campaign_id is not null
  24. group by g.campaign_id
  25. """
  26. # print(sql)
  27. data = db.quchen_text.get_data_list(sql)
  28. # print(data)
  29. # 图片链接拼接
  30. li = []
  31. for i in data:
  32. # print(i)
  33. li.extend(i[16].split(','))
  34. # TODO:之后如果一天产生的图片过多,可能超过sql的字符限制
  35. # 之后数据使用hive,来进行数据存储
  36. sql3 = f"select image_id,preview_url,signature,width,height,size,`type` from image_info where image_id in ({str(set(li))[1:-1]})"
  37. image_di = {}
  38. image_data = db.quchen_text.getData(sql3)
  39. signature_dict = {} # key signature_id v:(pitcher,last_modified_time)
  40. for x in image_data:
  41. image_di[x[0]] = (x[1], x[2], x[3], x[4], x[5], x[6])
  42. for i in data:
  43. signature_tmp = ''
  44. for j in i[16].split(','):
  45. signature_tmp = signature_tmp + ',' + (image_di.get(j)[1] if image_di.get(j) else ' ')
  46. signature_tmp = signature_tmp[1:]
  47. if signature_tmp not in signature_dict.keys():
  48. signature_dict[signature_tmp] = (i[15], i[17])
  49. else:
  50. sig_last_modified_time = signature_dict[signature_tmp][1]
  51. if sig_last_modified_time is None:
  52. signature_dict[signature_tmp] = (i[15], i[17])
  53. elif i[17] is not None and i[17] < sig_last_modified_time:
  54. signature_dict[signature_tmp] = (i[15], i[17])
  55. # print(image_di)
  56. for i in data:
  57. preview_url = ''
  58. signature = ''
  59. width = '0'
  60. height = '0'
  61. image_id = ''
  62. size = ''
  63. type = ''
  64. video_length = None
  65. video_bit_rate = None
  66. video_meta_data = None
  67. download_path = None
  68. for j in i[16].split(','):
  69. if image_di.get(j):
  70. image_id = image_id + ',' + j
  71. preview_url = preview_url + ',' + image_di.get(j)[0]
  72. signature = signature + ',' + image_di.get(j)[1]
  73. width = str(image_di.get(j)[2])
  74. height = str(image_di.get(j)[3])
  75. size = size + ',' + str(image_di.get(j)[4])
  76. type = type + ',' + str(image_di.get(j)[5])
  77. else:
  78. image_id = image_id + ',' + j
  79. preview_url = preview_url + ',' + ' '
  80. signature = signature + ',' + ' '
  81. width = '0'
  82. height = '0'
  83. size = size + ',' + '0'
  84. type = type + ',' + ' '
  85. signature = signature[1:]
  86. owner = signature_dict[signature][0]
  87. i[16] = image_id[1:]
  88. i.append(preview_url[1:])
  89. i.append(signature)
  90. i.append(0)
  91. i.append(width)
  92. i.append(height)
  93. i.append(size[1:])
  94. i.append(type[1:])
  95. i.append(video_length)
  96. i.append(video_bit_rate)
  97. i.append(video_meta_data)
  98. i.append(download_path)
  99. i.append(i[18])
  100. i.append(owner)
  101. data_new = []
  102. for i in data:
  103. i = i[:17] + i[19:]
  104. # print(i)
  105. data_new.append(i)
  106. data = data_new
  107. # exit(0)
  108. sql_video = f"""
  109. select foo.*,foo2.pitcher as owner from
  110. (SELECT a.dt,b.type,count(*),sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
  111. title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''),g.preview_url,g.signature,1,
  112. g.width,g.height,g.`size` ,g.`type` as video_type ,g.video_length ,g.byte_rate ,g.video_meta_data,g.download_path
  113. ,min(h.last_modified_time) as last_modified_time , h.campaign_id
  114. from
  115. ad_cost_day a
  116. left join ad_info b on a.ad_id=b.ad_id
  117. left join adcreative_info c on b.adcreative_id=c.adcreative_id
  118. left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
  119. left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
  120. left join video_info g on c.image_id=g.video_id
  121. left join campaign_info h on b.campaign_id = h.campaign_id
  122. where a.dt='{dt}' and c.is_video=1 and h.campaign_id is not null
  123. group by h.campaign_id) as foo
  124. inner join
  125. (select pitcher,min(h.last_modified_time) as last_modified_time
  126. from
  127. ad_cost_day a
  128. left join ad_info b on a.ad_id=b.ad_id
  129. left join adcreative_info c on b.adcreative_id=c.adcreative_id
  130. left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
  131. left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
  132. left join video_info g on c.image_id=g.video_id
  133. left join campaign_info h on b.campaign_id = h.campaign_id
  134. where a.dt='{dt}' and c.is_video=1 and h.campaign_id is not null
  135. group by pitcher,h.last_modified_time ) as foo2
  136. on foo.pitcher=foo2.pitcher and foo.last_modified_time=foo2.last_modified_time
  137. """
  138. data_video = db.quchen_text.get_data_list(sql_video)
  139. data_new = []
  140. for i in data_video:
  141. i = i[:-3] + i[-2:]
  142. data_new.append(i)
  143. data.extend(data_new)
  144. # 进行数据存储
  145. db.dm.execute(f'delete from dw_image_cost_day where dt="{dt}"')
  146. db.dm.executeMany(
  147. '''replace into dw_image_cost_day
  148. (dt,type,use_times,cost,view_count,click_count,follow_count,order_count,
  149. order_amount,title,description,book,platform,stage,channel,pitcher,image_id,
  150. preview_url,signature,is_video,width,height,size,format,video_length,
  151. video_bit_rate,video_meta_data,download_path,campaign_id,owner)
  152. values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''',
  153. data)
  154. # # 图片数据归属权改变
  155. # # 大于五天的数据进行,消耗超过5000的owner置位null
  156. # owner_sql = '''
  157. # UPDATE dw_image_cost_day
  158. # set owner = null
  159. # WHERE
  160. # dt<date_add(now(),interval -5 day) or cost>5000
  161. # '''
  162. # db.dm.execute(owner_sql)
  163. # ck对应数据也保存一份
  164. # 1.进行当天相关的分区删除
  165. ck.client.execute(f''' alter table dw_image_cost_day drop partition '{dt}' ''')
  166. col = ['dt', 'type', 'use_times', 'cost', 'view_count', 'click_count',
  167. 'follow_count', 'order_count', 'order_amount', 'title', 'description',
  168. 'book', 'platform', 'stage', 'channel', 'pitcher', 'image_id', 'preview_url',
  169. 'signature', 'is_video', 'width', 'height', 'size', 'format', 'video_length',
  170. 'video_bit_rate', 'video_meta_data', 'download_path','campaign_id', 'owner']
  171. # ck存入前进行数据格式化
  172. for _ in data:
  173. # data= [col if col else 'null' for col in data ]
  174. _[0] = datetime.strptime(_[0], '%Y-%m-%d')
  175. _[1] = str(_[1]) if _[1] is not None else None
  176. _[2] = int(_[2]) if _[2] is not None else None
  177. _[3] = float(_[3]) if _[3] is not None else None
  178. _[4] = int(_[4]) if _[4] is not None else None
  179. _[5] = int(_[5]) if _[5] is not None else None
  180. _[6] = int(_[6]) if _[6] is not None else None
  181. _[7] = int(_[7]) if _[7] is not None else None
  182. _[8] = float(_[8]) if _[8] is not None else None
  183. _[19] = str(_[19])
  184. _[20] = str(_[20]) if _[20] is not None else None
  185. _[21] = str(_[21]) if _[21] is not None else None
  186. _[22] = str(_[22]) if _[22] is not None else None
  187. _[23] = str(_[23]) if _[23] is not None else None
  188. col_str = ','.join(col)
  189. logging.info('ck填充数据进入')
  190. ck.client.execute('SET max_partitions_per_insert_block=1000;')
  191. ck.client.execute('insert into dw_image_cost_day ({}) values'.format(col_str), data)
  192. logging.info('ck填充数据,结束')
  193. def hourly():
  194. try:
  195. logging.info('广告数据清洗,开始')
  196. run(du.getNow())
  197. logging.info('广告数据清洗,结束')
  198. except Exception as e:
  199. logging.error(str(e))
  200. DingTalkUtils().send("广告数据清洗失败\n" + str(e))
  201. def day():
  202. logging.info('广告数据清洗,开始')
  203. for i in du.getDateLists(du.get_n_days(-10), du.get_n_days(-1)):
  204. # print(i)
  205. run(i)
  206. logging.info('广告数据清洗,结束')
  207. if __name__ == '__main__':
  208. # run('2021-05-18')
  209. logging.basicConfig(
  210. handlers=[
  211. logging.handlers.RotatingFileHandler('.ad_hourly.log',
  212. maxBytes=10 * 1024 * 1024,
  213. backupCount=5,
  214. encoding='utf-8')
  215. , logging.StreamHandler() # 供输出使用
  216. ],
  217. level=logging.INFO,
  218. format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s"
  219. )
  220. # -495
  221. #
  222. for i in du.getDateLists(du.get_n_days(-495), du.get_n_days(0)):
  223. print(i)
  224. # exit()
  225. run(i)
  226. # print(du.get_n_days(-20))
  227. # run(du.get_n_days(0))