dw_image_cost_day.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. from logging import handlers
  2. from model.DataBaseUtils import MysqlUtils, CkUtils
  3. from model.DateUtils import DateUtils
  4. from model.DingTalkUtils import DingTalkUtils
  5. from datetime import datetime
  6. import logging
  7. # logging.getLogger().setLevel(logging.WARNING)
  8. db = MysqlUtils()
  9. ck = CkUtils()
  10. du = DateUtils()
  11. def run(dt):
  12. sql = f"""
  13. SELECT a.dt,b.type,count(*) as ct,sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
  14. title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''),
  15. g.last_modified_time,g.campaign_id
  16. from
  17. ad_cost_day a
  18. left join ad_info b on a.ad_id=b.ad_id
  19. left join adcreative_info c on b.adcreative_id=c.adcreative_id
  20. left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
  21. left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
  22. left join campaign_info g on b.campaign_id = g.campaign_id
  23. where a.dt='{dt}' and (c.is_video=0 or c.is_video is null) and g.campaign_id is not null
  24. group by g.campaign_id
  25. """
  26. # print(sql)
  27. data = db.quchen_text.get_data_list(sql)
  28. # print(data)
  29. # 图片链接拼接
  30. li = []
  31. for i in data:
  32. # print(i)
  33. li.extend(i[16].split(','))
  34. # TODO:之后如果一天产生的图片过多,可能超过sql的字符限制
  35. # TODO:归属人数据有问题
  36. # 之后数据使用hive,来进行数据存储
  37. sql3 = f"select image_id,preview_url,signature,width,height,size,`type` from image_info where image_id in ({str(set(li))[1:-1]})"
  38. image_di = {}
  39. image_data = db.quchen_text.getData(sql3)
  40. signature_dict = {} # key signature_id v:(pitcher,last_modified_time)
  41. for x in image_data:
  42. image_di[x[0]] = (x[1], x[2], x[3], x[4], x[5], x[6])
  43. for i in data:
  44. signature_tmp = ''
  45. for j in i[16].split(','):
  46. signature_tmp = signature_tmp + ',' + (image_di.get(j)[1] if image_di.get(j) else ' ')
  47. signature_tmp = signature_tmp[1:]
  48. if signature_tmp not in signature_dict.keys():
  49. signature_dict[signature_tmp] = (i[15], i[17])
  50. else:
  51. sig_last_modified_time = signature_dict[signature_tmp][1]
  52. if sig_last_modified_time is None:
  53. signature_dict[signature_tmp] = (i[15], i[17])
  54. elif i[17] is not None and i[17] < sig_last_modified_time:
  55. signature_dict[signature_tmp] = (i[15], i[17])
  56. # print(image_di)
  57. for i in data:
  58. preview_url = ''
  59. signature = ''
  60. width = '0'
  61. height = '0'
  62. image_id = ''
  63. size = ''
  64. type = ''
  65. video_length = None
  66. video_bit_rate = None
  67. video_meta_data = None
  68. download_path = None
  69. for j in i[16].split(','):
  70. if image_di.get(j):
  71. image_id = image_id + ',' + j
  72. preview_url = preview_url + ',' + image_di.get(j)[0]
  73. signature = signature + ',' + image_di.get(j)[1]
  74. width = str(image_di.get(j)[2])
  75. height = str(image_di.get(j)[3])
  76. size = size + ',' + str(image_di.get(j)[4])
  77. type = type + ',' + str(image_di.get(j)[5])
  78. else:
  79. image_id = image_id + ',' + j
  80. preview_url = preview_url + ',' + ' '
  81. signature = signature + ',' + ' '
  82. width = '0'
  83. height = '0'
  84. size = size + ',' + '0'
  85. type = type + ',' + ' '
  86. signature = signature[1:]
  87. owner = signature_dict[signature][0]
  88. i[16] = image_id[1:]
  89. i.append(preview_url[1:])
  90. i.append(signature)
  91. i.append(0)
  92. i.append(width)
  93. i.append(height)
  94. i.append(size[1:])
  95. i.append(type[1:])
  96. i.append(video_length)
  97. i.append(video_bit_rate)
  98. i.append(video_meta_data)
  99. i.append(download_path)
  100. i.append(i[18])
  101. i.append(owner)
  102. data_new = []
  103. for i in data:
  104. i = i[:17] + i[19:]
  105. # print(i)
  106. data_new.append(i)
  107. data = data_new
  108. # exit(0)
  109. sql_video = f"""
  110. select foo.*,foo2.pitcher as owner from
  111. (SELECT a.dt,b.type,count(*),sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
  112. title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''),g.preview_url,g.signature,1,
  113. g.width,g.height,g.`size` ,g.`type` as video_type ,g.video_length ,g.byte_rate ,g.video_meta_data,g.download_path
  114. ,min(h.last_modified_time) as last_modified_time , h.campaign_id
  115. from
  116. ad_cost_day a
  117. left join ad_info b on a.ad_id=b.ad_id
  118. left join adcreative_info c on b.adcreative_id=c.adcreative_id
  119. left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
  120. left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
  121. left join video_info g on c.image_id=g.video_id
  122. left join campaign_info h on b.campaign_id = h.campaign_id
  123. where a.dt='{dt}' and c.is_video=1 and h.campaign_id is not null
  124. group by h.campaign_id) as foo
  125. inner join
  126. (select pitcher,min(h.last_modified_time) as last_modified_time
  127. from
  128. ad_cost_day a
  129. left join ad_info b on a.ad_id=b.ad_id
  130. left join adcreative_info c on b.adcreative_id=c.adcreative_id
  131. left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
  132. left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
  133. left join video_info g on c.image_id=g.video_id
  134. left join campaign_info h on b.campaign_id = h.campaign_id
  135. where a.dt='{dt}' and c.is_video=1 and h.campaign_id is not null
  136. group by pitcher,h.last_modified_time ) as foo2
  137. on foo.pitcher=foo2.pitcher and foo.last_modified_time=foo2.last_modified_time
  138. """
  139. data_video = db.quchen_text.get_data_list(sql_video)
  140. data_new = []
  141. for i in data_video:
  142. i = i[:-3] + i[-2:]
  143. data_new.append(i)
  144. data.extend(data_new)
  145. # 进行数据存储
  146. db.dm.execute(f'delete from dw_image_cost_day where dt="{dt}"')
  147. db.dm.executeMany(
  148. '''replace into dw_image_cost_day
  149. (dt,type,use_times,cost,view_count,click_count,follow_count,order_count,
  150. order_amount,title,description,book,platform,stage,channel,pitcher,image_id,
  151. preview_url,signature,is_video,width,height,size,format,video_length,
  152. video_bit_rate,video_meta_data,download_path,campaign_id,owner)
  153. values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''',
  154. data)
  155. def hourly():
  156. try:
  157. logging.info('广告数据清洗,开始')
  158. run(du.getNow())
  159. logging.info('广告数据清洗,结束')
  160. except Exception as e:
  161. logging.error(str(e))
  162. DingTalkUtils().send("广告数据清洗失败\n" + str(e))
  163. def day():
  164. logging.info('广告数据清洗,开始')
  165. for i in du.getDateLists(du.get_n_days(-10), du.get_n_days(-1)):
  166. # print(i)
  167. run(i)
  168. logging.info('广告数据清洗,结束')
  169. if __name__ == '__main__':
  170. # run('2021-05-18')
  171. logging.basicConfig(
  172. handlers=[
  173. logging.handlers.RotatingFileHandler('.ad_hourly.log',
  174. maxBytes=10 * 1024 * 1024,
  175. backupCount=5,
  176. encoding='utf-8')
  177. , logging.StreamHandler() # 供输出使用
  178. ],
  179. level=logging.INFO,
  180. format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s"
  181. )
  182. # -495
  183. #
  184. # for i in du.getDateLists(du.get_n_days(-495), du.get_n_days(0)):
  185. # print(i)
  186. # # exit()
  187. # run(i)
  188. # print(du.get_n_days(-20))
  189. run(du.get_n_days(0))