dw_image_cost_day.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. import logging
  2. from model.DataBaseUtils import MysqlUtils, CkUtils
  3. from model.DateUtils import DateUtils
  4. from model.DingTalkUtils import DingTalkUtils
  5. # logging.getLogger().setLevel(logging.WARNING)
  6. import pandas as pd
  7. db = MysqlUtils()
  8. ck = CkUtils()
  9. du = DateUtils()
  10. def run(dt):
  11. sql = f"""SELECT a.dt,b.type,count(*) as ct,sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
  12. title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,'')
  13. from
  14. ad_cost_day a
  15. left join ad_info b on a.ad_id=b.ad_id
  16. left join adcreative_info c on b.adcreative_id=c.adcreative_id
  17. left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
  18. left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
  19. where a.dt='{dt}' and c.is_video=0
  20. group by a.dt,b.type,title,description,book,platform,stage,image_id,e.channel,pitcher
  21. """
  22. # print(sql)
  23. data = db.quchen_text.get_data_list(sql)
  24. # print(data)
  25. # 图片链接拼接
  26. li = []
  27. for i in data:
  28. # print(i)
  29. li.extend(i[-1].split(','))
  30. # TODO:之后如果一天产生的图片过多,可能超过sql的字符限制,
  31. # 之后数据使用hive,来进行数据存储
  32. sql3 = f"select image_id,preview_url,signature,width,height,size,`type` from image_info where image_id in ({str(set(li))[1:-1]})"
  33. image_di = {}
  34. image_data = db.quchen_text.getData(sql3)
  35. for x in image_data:
  36. image_di[x[0]] = (x[1], x[2], x[3], x[4], x[5], x[6])
  37. # print(image_di)
  38. for i in data:
  39. preview_url = ''
  40. signature = ''
  41. width = '0'
  42. height = '0'
  43. image_id = ''
  44. size = ''
  45. type = ''
  46. video_length = None
  47. video_bit_rate = None
  48. video_meta_data = None
  49. download_path = None
  50. for j in i[-1].split(','):
  51. if image_di.get(j):
  52. image_id = image_id + ',' + j
  53. preview_url = preview_url + ',' + image_di.get(j)[0]
  54. signature = signature + ',' + image_di.get(j)[1]
  55. width = str(image_di.get(j)[2])
  56. height = str(image_di.get(j)[3])
  57. size = size + ',' + str(image_di.get(j)[4])
  58. type = type + ',' + str(image_di.get(j)[5])
  59. else:
  60. image_id = image_id + ',' + j
  61. preview_url = preview_url + ',' + ' '
  62. signature = signature + ',' + ' '
  63. width = '0'
  64. height = '0'
  65. size = size + ',' + '0'
  66. type = type + ',' + ' '
  67. i[-1] = image_id[1:]
  68. i.append(preview_url[1:])
  69. i.append(signature[1:])
  70. i.append(0)
  71. i.append(width)
  72. i.append(height)
  73. i.append(size[1:])
  74. i.append(type[1:])
  75. i.append(video_length)
  76. i.append(video_bit_rate)
  77. i.append(video_meta_data)
  78. i.append(download_path)
  79. # exit(0)
  80. sql_video = f"""SELECT a.dt,b.type,count(*),sum(a.cost),sum(view_count),sum(click_count),sum(follow_count),sum(order_count),sum(order_amount),
  81. title,description,book,platform,stage,e.channel,pitcher,ifnull(image_id,''),g.preview_url,g.signature,1,
  82. g.width,g.height,g.`size` ,g.`type` ,g.video_length ,g.byte_rate ,g.video_meta_data,g.download_path
  83. from
  84. ad_cost_day a
  85. left join ad_info b on a.ad_id=b.ad_id
  86. left join adcreative_info c on b.adcreative_id=c.adcreative_id
  87. left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
  88. left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
  89. left join video_info g on c.image_id=g.video_id
  90. where a.dt='{dt}' and c.is_video=1
  91. group by a.dt,b.type,title,description,book,platform,stage,image_id,e.channel,pitcher """
  92. data_video = db.quchen_text.get_data_list(sql_video)
  93. data.extend(data_video)
  94. # 进行数据存储
  95. db.dm.execute(f'delete from dw_image_cost_day where dt="{dt}"')
  96. db.dm.executeMany(
  97. "replace into dw_image_cost_day values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
  98. data)
  99. def hourly():
  100. try:
  101. logging.info('广告数据清洗,开始')
  102. run(du.getNow())
  103. logging.info('广告数据清洗,结束')
  104. except:
  105. DingTalkUtils().send("广告数据清洗失败")
  106. def day():
  107. logging.info('广告数据清洗,开始')
  108. for i in du.getDateLists(du.get_n_days(-10), du.get_n_days(-1)):
  109. # print(i)
  110. run(i)
  111. logging.info('广告数据清洗,结束')
  112. if __name__ == '__main__':
  113. # run('2021-05-18')
  114. for i in du.getDateLists(du.get_n_days(0), du.get_n_days(0)):
  115. print(i)
  116. run(i)