dw_image_cost_day.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. from logging import handlers
  2. from model.DataBaseUtils import MysqlUtils, CkUtils
  3. from model.DateUtils import DateUtils
  4. from model.DingTalkUtils import DingTalkUtils
  5. from datetime import datetime
  6. import logging
  7. # logging.getLogger().setLevel(logging.WARNING)
  8. db = MysqlUtils()
  9. ck = CkUtils()
  10. du = DateUtils()
  11. def run(dt):
  12. sql = f"""SELECT a.dt,b.type,count(*) as ct,sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
  13. title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,'')
  14. from
  15. ad_cost_day a
  16. left join ad_info b on a.ad_id=b.ad_id
  17. left join adcreative_info c on b.adcreative_id=c.adcreative_id
  18. left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
  19. left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
  20. where a.dt='{dt}' and c.is_video=0
  21. group by a.dt,b.type,title,description,book,platform,stage,image_id,e.channel,pitcher
  22. """
  23. # print(sql)
  24. data = db.quchen_text.get_data_list(sql)
  25. # print(data)
  26. # 图片链接拼接
  27. li = []
  28. for i in data:
  29. # print(i)
  30. li.extend(i[-1].split(','))
  31. # TODO:之后如果一天产生的图片过多,可能超过sql的字符限制,
  32. # 之后数据使用hive,来进行数据存储
  33. sql3 = f"select image_id,preview_url,signature,width,height,size,`type` from image_info where image_id in ({str(set(li))[1:-1]})"
  34. image_di = {}
  35. image_data = db.quchen_text.getData(sql3)
  36. for x in image_data:
  37. image_di[x[0]] = (x[1], x[2], x[3], x[4], x[5], x[6])
  38. # print(image_di)
  39. for i in data:
  40. preview_url = ''
  41. signature = ''
  42. width = '0'
  43. height = '0'
  44. image_id = ''
  45. size = ''
  46. type = ''
  47. video_length = None
  48. video_bit_rate = None
  49. video_meta_data = None
  50. download_path = None
  51. for j in i[-1].split(','):
  52. if image_di.get(j):
  53. image_id = image_id + ',' + j
  54. preview_url = preview_url + ',' + image_di.get(j)[0]
  55. signature = signature + ',' + image_di.get(j)[1]
  56. width = str(image_di.get(j)[2])
  57. height = str(image_di.get(j)[3])
  58. size = size + ',' + str(image_di.get(j)[4])
  59. type = type + ',' + str(image_di.get(j)[5])
  60. else:
  61. image_id = image_id + ',' + j
  62. preview_url = preview_url + ',' + ' '
  63. signature = signature + ',' + ' '
  64. width = '0'
  65. height = '0'
  66. size = size + ',' + '0'
  67. type = type + ',' + ' '
  68. i[-1] = image_id[1:]
  69. i.append(preview_url[1:])
  70. i.append(signature[1:])
  71. i.append(0)
  72. i.append(width)
  73. i.append(height)
  74. i.append(size[1:])
  75. i.append(type[1:])
  76. i.append(video_length)
  77. i.append(video_bit_rate)
  78. i.append(video_meta_data)
  79. i.append(download_path)
  80. # exit(0)
  81. sql_video = f"""SELECT a.dt,b.type,count(*),sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
  82. title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''),g.preview_url,g.signature,1,
  83. g.width,g.height,g.`size` ,g.`type` ,g.video_length ,g.byte_rate ,g.video_meta_data,g.download_path
  84. from
  85. ad_cost_day a
  86. left join ad_info b on a.ad_id=b.ad_id
  87. left join adcreative_info c on b.adcreative_id=c.adcreative_id
  88. left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
  89. left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
  90. left join video_info g on c.image_id=g.video_id
  91. where a.dt='{dt}' and c.is_video=1
  92. group by a.dt,b.type,title,description,book,platform,stage,image_id,e.channel,pitcher """
  93. data_video = db.quchen_text.get_data_list(sql_video)
  94. data.extend(data_video)
  95. # 进行数据存储
  96. db.dm.execute(f'delete from dw_image_cost_day where dt="{dt}"')
  97. db.dm.executeMany(
  98. '''replace into dw_image_cost_day
  99. (dt,type,use_times,cost,view_count,click_count,follow_count,order_count,
  100. order_amount,title,description,book,platform,stage,channel,pitcher,image_id,
  101. preview_url,signature,is_video,width,height,size,format,video_length,
  102. video_bit_rate,video_meta_data,download_path)
  103. values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''',
  104. data)
  105. # ck对应数据也保存一份
  106. # 1,获取到所有分区
  107. ck_partitions = ck.execute(f'''
  108. select partition
  109. from `system`.parts p
  110. where table='dw_image_cost_day'
  111. and `partition` like '%{dt}%'
  112. ''')
  113. # 2.进行当天相关的分区删除
  114. for i in ck_partitions:
  115. ck.client.execute(''' alter table dw_image_cost_day drop partition ''' + i[0])
  116. col = ['dt', 'type', 'use_times', 'cost', 'view_count', 'click_count',
  117. 'follow_count', 'order_count', 'order_amount', 'title', 'description',
  118. 'book', 'platform', 'stage', 'channel', 'pitcher', 'image_id', 'preview_url',
  119. 'signature', 'is_video', 'width', 'height', 'size', 'format', 'video_length',
  120. 'video_bit_rate', 'video_meta_data', 'download_path']
  121. # ck存入前进行数据格式化
  122. for _ in data:
  123. # data= [col if col else 'null' for col in data ]
  124. _[0] = datetime.strptime(_[0], '%Y-%m-%d')
  125. _[1] = str(_[1]) if _[1] is not None else None
  126. _[2] = int(_[2]) if _[2] is not None else None
  127. _[3] = float(_[3]) if _[3] is not None else None
  128. _[4] = int(_[4]) if _[4] is not None else None
  129. _[5] = int(_[5]) if _[5] is not None else None
  130. _[6] = int(_[6]) if _[6] is not None else None
  131. _[7] = int(_[7]) if _[7] is not None else None
  132. _[8] = float(_[8]) if _[8] is not None else None
  133. _[19] = str(_[19])
  134. _[20] = str(_[20]) if _[20] is not None else None
  135. _[21] = str(_[21]) if _[21] is not None else None
  136. _[22] = str(_[22]) if _[22] is not None else None
  137. _[23] = str(_[23]) if _[23] is not None else None
  138. col_str = ','.join(col)
  139. logging.info('ck填充数据进入')
  140. ck.client.execute('SET max_partitions_per_insert_block=1000;')
  141. ck.client.execute('insert into dw_image_cost_day ({}) values'.format(col_str), data)
  142. logging.info('ck填充数据,结束')
  143. def hourly():
  144. try:
  145. logging.info('广告数据清洗,开始')
  146. run(du.getNow())
  147. logging.info('广告数据清洗,结束')
  148. except Exception as e:
  149. logging.error(str(e))
  150. DingTalkUtils().send("广告数据清洗失败\n" + str(e))
  151. def day():
  152. logging.info('广告数据清洗,开始')
  153. for i in du.getDateLists(du.get_n_days(-10), du.get_n_days(-1)):
  154. # print(i)
  155. run(i)
  156. logging.info('广告数据清洗,结束')
  157. if __name__ == '__main__':
  158. # run('2021-05-18')
  159. #-495
  160. logging.basicConfig(
  161. handlers=[
  162. logging.handlers.RotatingFileHandler('.ad_hourly.log',
  163. maxBytes=10 * 1024 * 1024,
  164. backupCount=5,
  165. encoding='utf-8')
  166. , logging.StreamHandler() # 供输出使用
  167. ],
  168. level=logging.INFO,
  169. format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s"
  170. )
  171. for i in du.getDateLists(du.get_n_days(-370), du.get_n_days(0)):
  172. print(i)
  173. run(i)
  174. # print(du.getNow())
  175. # run(du.getNow())