get_cost_older.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. import requests
  2. import hashlib
  3. import time
  4. import json
  5. import logging
  6. import random
  7. from concurrent.futures import ThreadPoolExecutor
  8. from model.DateUtils import DateUtils
  9. from model.DataBaseUtils import MysqlUtils
  10. from model.DingTalkUtils import DingTalkUtils
  11. from six import string_types
  12. from six.moves.urllib.parse import urlencode, urlunparse
  13. db = MysqlUtils()
  14. du = DateUtils()
  15. def md5value(s):
  16. md5 = hashlib.md5()
  17. md5.update(s.encode("utf-8"))
  18. return md5.hexdigest()
  19. def daily_reports_get(access_token, account_id, st, et, level, fields, err_num=0):
  20. logging.info(f'开始获取消耗数据,token:{access_token}, id:{account_id}, st:{str(st)}, et:{str(et)}')
  21. interface = 'daily_reports/get'
  22. url = 'https://api.e.qq.com/v1.1/' + interface
  23. common_parameters = {
  24. 'access_token': access_token,
  25. 'timestamp': int(time.time()),
  26. 'nonce': str(time.time()) + str(random.randint(0, 999999)),
  27. }
  28. parameters = {
  29. "account_id": account_id,
  30. "level": level,
  31. "date_range":
  32. {
  33. "start_date": st,
  34. "end_date": et
  35. },
  36. "page": 1,
  37. "page_size": 1000,
  38. "fields": fields
  39. }
  40. parameters.update(common_parameters)
  41. for k in parameters:
  42. if type(parameters[k]) is not str:
  43. parameters[k] = json.dumps(parameters[k])
  44. r = requests.get(url, params=parameters, timeout=5).json()
  45. logging.info('account_id: {} 开始获取消耗数据'.format(account_id))
  46. if r['code'] != 0:
  47. logging.warning(
  48. 'access_token:{} code:{} message:{}'.format(str(access_token), str(r['code']), str(r['message'])))
  49. if err_num < 5:
  50. time.sleep(0.1)
  51. return daily_reports_get(access_token, account_id, st, et, level, fields, err_num=err_num + 1)
  52. DingTalkUtils().send(
  53. '消耗日报请求出现问题\naccess_token:{} code:{} message:{}'.format(str(access_token), str(r['code']),
  54. str(r['message'])))
  55. return r
  56. def get_q_data(y, li, st, et):
  57. c = daily_reports_get(y[2], y[0], st, et, "REPORT_LEVEL_ADVERTISER", (
  58. 'date', 'view_count', 'valid_click_count', 'ctr', 'cpc', 'cost', 'web_order_count', 'web_order_rate',
  59. 'web_order_cost', 'follow_count', 'order_amount', 'order_roi', 'platform_page_view_count',
  60. 'web_commodity_page_view_count', 'from_follow_uv'))
  61. if 'data' in c.keys() and len(c["data"]["list"]) > 0:
  62. for d in c['data']['list']:
  63. d['account_id'] = y[0]
  64. logging.info('qq: ' + str(d['account_id']) + str(d["cost"]))
  65. x = tuple(d.values())
  66. li.append(x)
  67. def get_v_data(y, li, st, et):
  68. c = daily_reports_get(y[2], y[0], st, et, "REPORT_LEVEL_ADVERTISER_WECHAT", (
  69. 'date', 'cost', 'view_count', 'valid_click_count', 'ctr', 'official_account_follow_rate', 'order_amount',
  70. 'order_roi', 'order_count', 'order_rate', 'order_unit_price', 'web_order_cost', 'first_day_order_amount',
  71. 'first_day_order_count'))
  72. if 'data' in c.keys() and len(c["data"]["list"]) > 0:
  73. for d in c['data']['list']:
  74. d['account_id'] = y[0]
  75. logging.info('vx:' + str(d['account_id']) + str(d["cost"]))
  76. x = tuple(d.values())
  77. li.append(x)
  78. def get_tt_data(account_info, li, st, et):
  79. def build_url_ad(path, query=""):
  80. # type: (str, str) -> str
  81. """
  82. Build request URL
  83. :param path: Request path
  84. :param query: Querystring
  85. :return: Request URL
  86. """
  87. scheme, netloc = "https", "ad.oceanengine.com"
  88. return urlunparse((scheme, netloc, path, "", query, ""))
  89. page_num = 1
  90. advertiser_ids = account_info[1]
  91. for advertiser_id in advertiser_ids:
  92. while True:
  93. # account_info
  94. my_args = {
  95. "start_date": st,
  96. "end_date": et,
  97. "page_size": 100,
  98. "page": page_num,
  99. # "agent_id" : "1708974248093789",
  100. 'advertiser_id': advertiser_id,
  101. # "start_date": "%s"
  102. }
  103. PATH = "/open_api/2/report/advertiser/get/"
  104. args = json.loads(json.dumps(my_args))
  105. query_string = urlencode({k: v if isinstance(v, string_types) else json.dumps(v) for k, v in args.items()})
  106. url = build_url_ad(PATH, query_string)
  107. headers = {
  108. "Access-Token": account_info[0],
  109. }
  110. rsp = requests.get(url, headers=headers)
  111. '''
  112. date,cost,view_count,valid_click_count,
  113. ctr,official_account_follow_rate,order_amount,
  114. order_roi,order_count,order_rate,order_unit_price,
  115. web_order_cost,first_day_order_amount,first_day_order_count,account_id
  116. '''
  117. # print(account_info)
  118. # print(rsp.text)
  119. result = rsp.json()
  120. for _ in result['data']['list']:
  121. campaign_info = (_['stat_datetime'][:10], _['cost'] * 100, _['show'], _['click'],
  122. _['ctr'], None, None,
  123. None, None, None, None,
  124. None, None, None, advertiser_id)
  125. li.append(campaign_info)
  126. total_page = result['data']['page_info']['total_page']
  127. if page_num > total_page or page_num == total_page:
  128. break
  129. else:
  130. page_num = page_num + 1
  131. def get_vx_list():
  132. sql = "select account_id,wechat_account_id,access_token,refresh_token,name," \
  133. "ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from advertiser_vx"
  134. a = db.quchen_text.getData(sql)
  135. return a
  136. def get_qq_list():
  137. sql = "select account_id,'',access_token,refresh_token,name," \
  138. "ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from advertiser_qq"
  139. a = db.quchen_text.getData(sql)
  140. return a
  141. def mysql_insert_daily_vx(data):
  142. b = """replace into daily_vx (date,cost,view_count,valid_click_count,ctr,official_account_follow_rate,order_amount,
  143. order_roi,order_count,order_rate,order_unit_price,web_order_cost,first_day_order_amount,first_day_order_count,account_id)
  144. values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
  145. db.quchen_text.executeMany(b, data)
  146. def mysql_insert_daily_qq(data):
  147. a = """replace into daily_qq (date,view_count,valid_click_count,ctr,cpc,cost,web_order_count,web_order_rate,
  148. web_order_cost,follow_count,order_amount,order_roi,platform_page_view_count,web_commodity_page_view_count,
  149. from_follow_uv,account_id) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
  150. db.quchen_text.executeMany(a, data)
  151. def mysql_insert_daily_tt(data):
  152. b = """replace into daily_tt (date,cost,view_count,valid_click_count,ctr,
  153. official_account_follow_rate,order_amount,
  154. order_roi,order_count,order_rate,order_unit_price,
  155. web_order_cost,first_day_order_amount,
  156. first_day_order_count,account_id)
  157. values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
  158. db.quchen_text.executeMany(b, data)
  159. def get_daily_vx(st, et):
  160. token_list_v = get_vx_list()
  161. logging.info("获取vx账号:" + str(token_list_v.__len__()))
  162. time1 = time.time()
  163. executor = ThreadPoolExecutor(max_workers=10)
  164. li = []
  165. for y in token_list_v:
  166. executor.submit(get_v_data, y, li, st, et)
  167. executor.shutdown()
  168. logging.info('get_daily_vx:' + str(len(li)) + 'cost:' + str(int(time.time() - time1)))
  169. mysql_insert_daily_vx(li)
  170. def get_daily_qq(st, et):
  171. token_list_q = get_qq_list()
  172. logging.info("获取qq账号:" + str(token_list_q.__len__()))
  173. time1 = time.time()
  174. li = []
  175. executor = ThreadPoolExecutor(max_workers=10)
  176. for x in token_list_q:
  177. executor.submit(get_q_data, x, li, st, et)
  178. executor.shutdown()
  179. logging.info('get_qq_order:' + str(len(li)) + 'cost:' + str(int(time.time() - time1)))
  180. mysql_insert_daily_qq(li)
  181. def get_daily_tt(st, et):
  182. def refresh_access_token(appid, account_id, secret, refresh_token):
  183. open_api_url_prefix = "https://ad.oceanengine.com/open_api/"
  184. uri = "oauth2/refresh_token/"
  185. refresh_token_url = open_api_url_prefix + uri
  186. data = {
  187. "appid": appid,
  188. "secret": secret,
  189. "grant_type": "refresh_token",
  190. "refresh_token": refresh_token,
  191. }
  192. rsp = requests.post(refresh_token_url, json=data)
  193. # print(rsp.text)
  194. rsp_data = rsp.json()
  195. new_refresh_token = rsp_data['data']['refresh_token']
  196. new_access_token = rsp_data['data']['access_token']
  197. sql = f'''
  198. update bytedance_login_info
  199. set refresh_token='{new_refresh_token}' ,access_token='{new_access_token}'
  200. where appid='{appid}' and account_id='{account_id}'
  201. '''
  202. db.quchen_text.execute(sql)
  203. return rsp_data['data']['access_token']
  204. # 1.获取refresh_token
  205. sql = '''
  206. select appid,account_id,secret,refresh_token from bytedance_login_info
  207. '''
  208. accounts_info = db.quchen_text.getData(sql)
  209. # 2.刷新refresh_token,并获取最新的access_token
  210. for account_info in accounts_info:
  211. appid, account_id, secret, refresh_token = account_info
  212. access_token = refresh_access_token(appid, account_id, secret, refresh_token)
  213. # 3.获取agent_id
  214. sql = f'''
  215. select distinct(advertiser_id) from bytedance_pitcher_change
  216. where appid='{appid}' and account_id='{account_id}'
  217. '''
  218. advertiser_ids = db.quchen_text.getData(sql)
  219. logging.info("获取头条账号:" + str(advertiser_ids.__len__()))
  220. advertiser_ids = [_[0] for _ in advertiser_ids]
  221. # token,adv_ids
  222. account_info = (access_token, advertiser_ids)
  223. time1 = time.time()
  224. li = []
  225. get_tt_data(account_info, li, st, et)
  226. logging.info('get_tt_order:' + str(len(li)) + 'cost:' + str(int(time.time() - time1)))
  227. mysql_insert_daily_tt(li)
  228. def get_token_bytedance():
  229. #添加bytedance账号,需要添加一下access_token
  230. open_api_url_prefix = "https://ad.oceanengine.com/open_api/"
  231. uri = "oauth2/access_token/"
  232. url = open_api_url_prefix + uri
  233. data = {
  234. "app_id": 1709866698360883,
  235. "secret": "****",
  236. "grant_type": "auth_code",
  237. "auth_code": "********"
  238. }
  239. rsp = requests.post(url, json=data)
  240. rsp_data = rsp.json()
  241. return rsp_data['data']['access_token']
  242. def run(st, et):
  243. logging.info('微信消耗数据拉取,开始')
  244. get_daily_vx(st, et)
  245. logging.info('微信消耗数据拉取,结束')
  246. logging.info('qq消耗数据拉取,开始')
  247. get_daily_qq(st, et)
  248. logging.info('qq消耗数据拉取,结束')
  249. logging.info('头条消耗数据拉取,开始')
  250. get_daily_tt(st, et)
  251. logging.info('头条消耗数据拉取,结束')
  252. def old_cost_hourly():
  253. st = et = du.getNow()
  254. logging.info('消耗数据拉取,开始')
  255. run(st, et)
  256. logging.info('消耗数据拉取,结束')
  257. def old_cost_daily():
  258. st = du.get_n_days(-10)
  259. et = du.get_n_days(-1)
  260. run(st, et)
  261. if __name__ == '__main__':
  262. # run()
  263. # old_cost_daily()
  264. st = du.get_n_days(-30)
  265. et = du.get_n_days(0)
  266. print(st, et)
  267. get_daily_tt(st, et)