get_ad_cost_min.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. from logging import handlers
  2. from model.DateUtils import DateUtils
  3. import time
  4. import logging
  5. from app.api_data.tx_ad_cost.get_cost import ad_cost_day, get_accounts
  6. from app.etl.dw import dw_image_cost_day
  7. from model.sql_models import DB
  8. from config import using_config
  9. from model.DataBaseUtils import MysqlUtils
  10. import random
  11. import pandas
  12. import json
  13. import requests
  14. from concurrent.futures import ThreadPoolExecutor
  15. import threading
  16. du = DateUtils()
  17. db = MysqlUtils()
  18. qucheng_db = DB(config=using_config.quchen_text)
  19. def ad_cost_day_mp(account_id, access_token, st, et):
  20. # 接口文档 https://developers.e.qq.com/docs/api/insights/ad_insights/daily_reports_get?version=1.3
  21. url = 'https://api.e.qq.com/v1.3/daily_reports/get'
  22. fields = ('date', 'ad_id', 'adgroup_id', 'cost', 'view_count', 'valid_click_count', 'official_account_follow_count',
  23. 'order_count', 'order_amount')
  24. li = []
  25. page = 1
  26. total_page = 1
  27. while True:
  28. parameters = {
  29. 'access_token': access_token,
  30. 'timestamp': int(time.time()),
  31. 'nonce': str(time.time()) + str(random.randint(0, 999999)),
  32. 'fields': fields,
  33. "account_id": account_id,
  34. "level": 'REPORT_LEVEL_AD_WECHAT',
  35. "page": page,
  36. "page_size": 1000,
  37. "date_range": {
  38. "start_date": st,
  39. "end_date": et
  40. }
  41. }
  42. for k in parameters:
  43. if type(parameters[k]) is not str:
  44. parameters[k] = json.dumps(parameters[k])
  45. while True:
  46. r = requests.get(url, params=parameters, timeout=5)
  47. r = r.json()
  48. # import pandas as pd
  49. # logging.info(pd.DataFrame(r['data']['list']))
  50. code = r['code']
  51. if code == 11017:
  52. time.sleep(61)
  53. else:
  54. break
  55. if r.get("data"):
  56. for i in r['data']['list']:
  57. if i['cost'] > 0:
  58. li.append(
  59. (
  60. i['date'], i['ad_id'], i['adgroup_id'],
  61. i['cost'] / 100, i['view_count'],
  62. i['valid_click_count'],
  63. i['official_account_follow_count'],
  64. i['order_count'], i['order_amount'] / 100, account_id,
  65. 'MP'
  66. )
  67. )
  68. # print(r)
  69. total_page = r['data']['page_info']['total_page']
  70. if page >= total_page:
  71. break
  72. else:
  73. page += 1
  74. # logging.info(li)
  75. # exit()
  76. if len(li) > 0:
  77. # TODO:询问一下adgroup_id,campaign_id作用
  78. # 对一下ad的数据
  79. li_df = pandas.DataFrame(li)
  80. li_df_g = li_df.groupby([0, 1, 9, 10])
  81. li_new = []
  82. adgroup_id_dict = {}
  83. for index, group in li_df_g:
  84. adgroup_id_dict[index] = ','.join([str(i) for i in group[2].tolist()])
  85. for index, row in li_df_g.agg('sum').iterrows():
  86. new_row = row.tolist()
  87. new_row = list(index[0:2]) + new_row + list(index[2:])
  88. new_row[2] = adgroup_id_dict[index]
  89. li_new.append(tuple(new_row))
  90. logging.info(f"{account_id} have ad cost :{len(li_new)} ")
  91. # db.quchen_text.executeMany('replace into ad_cost_day values(%s,%s,%s,%s,%s,'
  92. # '%s,%s,%s,%s,%s,%s)', li_new)
  93. qc_session = qucheng_db.DBSession()
  94. for _ in li_new:
  95. qc_session.execute(
  96. '''replace into ad_cost_day values('{}',{},'{}',{},{},{},{},{},{},'{}','{}')'''.format(*_))
  97. qc_session.commit()
  98. def get_ad_data():
  99. max_workers = 100
  100. executor = ThreadPoolExecutor(max_workers=max_workers)
  101. st = du.get_n_days(0)
  102. et = du.get_n_days(0)
  103. sql = '''select distinct(account_id) as d_account_id from ad_cost_day acd
  104. where dt='2021-08-26'
  105. order by cost desc
  106. limit 800'''
  107. accounts_use = db.quchen_text.getData(sql)
  108. accounts_use_set = set()
  109. for accounts in accounts_use:
  110. accounts_use_set.add(accounts[0])
  111. thread_list = []
  112. for account in get_accounts():
  113. if account[0] in accounts_use_set:
  114. # ad_cost_day_mp(account[0], account[1], st, et)
  115. # one = threading.Thread(target=ad_cost_day_mp, args=(account[0], account[1], st, et))
  116. # one.start()
  117. # thread_list.append(one)
  118. thread_tmp=executor.submit(ad_cost_day_mp, account[0], account[1], st, et)
  119. thread_list.append(thread_tmp)
  120. for _ in thread_list:
  121. while True:
  122. if _.done():
  123. break
  124. time.sleep(0.1)
  125. def get_data():
  126. while True:
  127. try:
  128. # 1.获取数据
  129. # 2.dw_image_cost进行数据更新
  130. # 3.休眠
  131. logging.info('获取开始')
  132. # ad_cost_day(du.get_n_days(0), du.get_n_days(0))
  133. get_ad_data()
  134. logging.info('获取数据,结束')
  135. logging.info('dw_image_cost 进行数据更新')
  136. dw_image_cost_day.run(du.get_n_days(0))
  137. logging.info('dw_image_cost 进行数据更新,结束')
  138. time.sleep(60*3)
  139. global db
  140. db.close()
  141. db = MysqlUtils()
  142. except Exception as e:
  143. raise
  144. print(e)
  145. time.sleep(60)
  146. if __name__ == '__main__':
  147. logging.basicConfig(
  148. handlers=[
  149. logging.handlers.RotatingFileHandler('./get_ad_cost_min.log',
  150. maxBytes=10 * 1024 * 1024,
  151. backupCount=5,
  152. encoding='utf-8')
  153. , logging.StreamHandler() # 供输出使用
  154. ],
  155. level=logging.INFO,
  156. format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s"
  157. )
  158. get_data()