123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295 |
- import requests
- import hashlib
- import time
- import json
- import logging
- import random
- from concurrent.futures import ThreadPoolExecutor
- from model.DateUtils import DateUtils
- from model.DataBaseUtils import MysqlUtils
- from model.DingTalkUtils import DingTalkUtils
- from six import string_types
- from six.moves.urllib.parse import urlencode, urlunparse
- db = MysqlUtils()
- du = DateUtils()
- def md5value(s):
- md5 = hashlib.md5()
- md5.update(s.encode("utf-8"))
- return md5.hexdigest()
- def daily_reports_get(access_token, account_id, st, et, level, fields, err_num=0):
- logging.info(f'开始获取消耗数据,token:{access_token}, id:{account_id}, st:{str(st)}, et:{str(et)}')
- interface = 'daily_reports/get'
- url = 'https://api.e.qq.com/v1.1/' + interface
- common_parameters = {
- 'access_token': access_token,
- 'timestamp': int(time.time()),
- 'nonce': str(time.time()) + str(random.randint(0, 999999)),
- }
- parameters = {
- "account_id": account_id,
- "level": level,
- "date_range":
- {
- "start_date": st,
- "end_date": et
- },
- "page": 1,
- "page_size": 1000,
- "fields": fields
- }
- parameters.update(common_parameters)
- for k in parameters:
- if type(parameters[k]) is not str:
- parameters[k] = json.dumps(parameters[k])
- r = requests.get(url, params=parameters, timeout=5).json()
- logging.info('account_id: {} 开始获取消耗数据'.format(account_id))
- if r['code'] != 0:
- logging.warning(
- 'access_token:{} code:{} message:{}'.format(str(access_token), str(r['code']), str(r['message'])))
- if err_num < 5:
- time.sleep(0.1)
- return daily_reports_get(access_token, account_id, st, et, level, fields, err_num=err_num + 1)
- DingTalkUtils().send(
- '消耗日报请求出现问题\naccess_token:{} code:{} message:{}'.format(str(access_token), str(r['code']),
- str(r['message'])))
- return r
- def get_q_data(y, li, st, et):
- c = daily_reports_get(y[2], y[0], st, et, "REPORT_LEVEL_ADVERTISER", (
- 'date', 'view_count', 'valid_click_count', 'ctr', 'cpc', 'cost', 'web_order_count', 'web_order_rate',
- 'web_order_cost', 'follow_count', 'order_amount', 'order_roi', 'platform_page_view_count',
- 'web_commodity_page_view_count', 'from_follow_uv'))
- if 'data' in c.keys() and len(c["data"]["list"]) > 0:
- for d in c['data']['list']:
- d['account_id'] = y[0]
- logging.info('qq: ' + str(d['account_id']) + str(d["cost"]))
- x = tuple(d.values())
- li.append(x)
- def get_v_data(y, li, st, et):
- c = daily_reports_get(y[2], y[0], st, et, "REPORT_LEVEL_ADVERTISER_WECHAT", (
- 'date', 'cost', 'view_count', 'valid_click_count', 'ctr', 'official_account_follow_rate', 'order_amount',
- 'order_roi', 'order_count', 'order_rate', 'order_unit_price', 'web_order_cost', 'first_day_order_amount',
- 'first_day_order_count'))
- if 'data' in c.keys() and len(c["data"]["list"]) > 0:
- for d in c['data']['list']:
- d['account_id'] = y[0]
- logging.info('vx:' + str(d['account_id']) + str(d["cost"]))
- x = tuple(d.values())
- li.append(x)
- def get_tt_data(account_info, li, st, et):
- def build_url_ad(path, query=""):
- # type: (str, str) -> str
- """
- Build request URL
- :param path: Request path
- :param query: Querystring
- :return: Request URL
- """
- scheme, netloc = "https", "ad.oceanengine.com"
- return urlunparse((scheme, netloc, path, "", query, ""))
- page_num = 1
- advertiser_ids = account_info[1]
- for advertiser_id in advertiser_ids:
- while True:
- # account_info
- my_args = {
- "start_date": st,
- "end_date": et,
- "page_size": 100,
- "page": page_num,
- # "agent_id" : "1708974248093789",
- 'advertiser_id': advertiser_id,
- # "start_date": "%s"
- }
- PATH = "/open_api/2/report/advertiser/get/"
- args = json.loads(json.dumps(my_args))
- query_string = urlencode({k: v if isinstance(v, string_types) else json.dumps(v) for k, v in args.items()})
- url = build_url_ad(PATH, query_string)
- headers = {
- "Access-Token": account_info[0],
- }
- rsp = requests.get(url, headers=headers)
- '''
- date,cost,view_count,valid_click_count,
- ctr,official_account_follow_rate,order_amount,
- order_roi,order_count,order_rate,order_unit_price,
- web_order_cost,first_day_order_amount,first_day_order_count,account_id
- '''
- result = rsp.json()
- for _ in result['data']['list']:
- campaign_info = (_['stat_datetime'][:10], _['cost'] * 100, _['show'], _['click'],
- _['ctr'], None, None,
- None, None, None, None,
- None, None, None, advertiser_id)
- li.append(campaign_info)
- total_page = result['data']['page_info']['total_page']
- if page_num > total_page or page_num == total_page:
- break
- else:
- page_num = page_num + 1
- def get_vx_list():
- sql = "select account_id,wechat_account_id,access_token,refresh_token,name," \
- "ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from advertiser_vx"
- a = db.quchen_text.getData(sql)
- return a
- def get_qq_list():
- sql = "select account_id,'',access_token,refresh_token,name," \
- "ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from advertiser_qq"
- a = db.quchen_text.getData(sql)
- return a
- def mysql_insert_daily_vx(data):
- b = """replace into daily_vx (date,cost,view_count,valid_click_count,ctr,official_account_follow_rate,order_amount,
- order_roi,order_count,order_rate,order_unit_price,web_order_cost,first_day_order_amount,first_day_order_count,account_id)
- values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
- db.quchen_text.executeMany(b, data)
- def mysql_insert_daily_qq(data):
- a = """replace into daily_qq (date,view_count,valid_click_count,ctr,cpc,cost,web_order_count,web_order_rate,
- web_order_cost,follow_count,order_amount,order_roi,platform_page_view_count,web_commodity_page_view_count,
- from_follow_uv,account_id) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
- db.quchen_text.executeMany(a, data)
- def mysql_insert_daily_tt(data):
- b = """replace into daily_tt (date,cost,view_count,valid_click_count,ctr,
- official_account_follow_rate,order_amount,
- order_roi,order_count,order_rate,order_unit_price,
- web_order_cost,first_day_order_amount,
- first_day_order_count,account_id)
- values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
- db.quchen_text.executeMany(b, data)
- def get_daily_vx(st, et):
- token_list_v = get_vx_list()
- logging.info("获取vx账号:" + str(token_list_v.__len__()))
- time1 = time.time()
- executor = ThreadPoolExecutor(max_workers=10)
- li = []
- for y in token_list_v:
- executor.submit(get_v_data, y, li, st, et)
- executor.shutdown()
- logging.info('get_daily_vx:' + str(len(li)) + 'cost:' + str(int(time.time() - time1)))
- mysql_insert_daily_vx(li)
- def get_daily_qq(st, et):
- token_list_q = get_qq_list()
- logging.info("获取qq账号:" + str(token_list_q.__len__()))
- time1 = time.time()
- li = []
- executor = ThreadPoolExecutor(max_workers=10)
- for x in token_list_q:
- executor.submit(get_q_data, x, li, st, et)
- executor.shutdown()
- logging.info('get_qq_order:' + str(len(li)) + 'cost:' + str(int(time.time() - time1)))
- mysql_insert_daily_qq(li)
- def get_daily_tt(st, et):
- def refresh_access_token(appid, secret, refresh_token):
- open_api_url_prefix = "https://ad.oceanengine.com/open_api/"
- uri = "oauth2/refresh_token/"
- refresh_token_url = open_api_url_prefix + uri
- data = {
- "appid": appid,
- "secret": secret,
- "grant_type": "refresh_token",
- "refresh_token": refresh_token,
- }
- rsp = requests.post(refresh_token_url, json=data)
- # print(rsp.text)
- rsp_data = rsp.json()
- new_refresh_token = rsp_data['data']['refresh_token']
- sql = f'''
- update bytedance_login_info
- set refresh_token='{new_refresh_token}'
- where appid='{appid}'
- '''
- db.quchen_text.execute(sql)
- return rsp_data['data']['access_token']
- # 1.获取refresh_token
- sql = '''
- select appid,secret,refresh_token from bytedance_login_info
- '''
- accounts_info = db.quchen_text.getData(sql)
- # 2.刷新refresh_token,并获取最新的access_token
- for account_info in accounts_info:
- appid, secret, refresh_token = account_info
- access_token = refresh_access_token(appid, secret, refresh_token)
- # 3.获取agent_id
- sql = f'''
- select distinct(advertiser_id) from bytedance_pitcher_change
- where appid='{appid}'
- '''
- advertiser_ids = db.quchen_text.getData(sql)
- logging.info("获取头条账号:" + str(advertiser_ids.__len__()))
- advertiser_ids = [_[0] for _ in advertiser_ids]
- # token,adv_ids
- account_info = (access_token, advertiser_ids)
- time1 = time.time()
- li = []
- get_tt_data(account_info, li, st, et)
- logging.info('get_tt_order:' + str(len(li)) + 'cost:' + str(int(time.time() - time1)))
- mysql_insert_daily_tt(li)
- def run(st, et):
- logging.info('微信消耗数据拉取,开始')
- get_daily_vx(st, et)
- logging.info('微信消耗数据拉取,结束')
- logging.info('qq消耗数据拉取,开始')
- get_daily_qq(st, et)
- logging.info('qq消耗数据拉取,结束')
- logging.info('头条消耗数据拉取,开始')
- get_daily_tt(st, et)
- logging.info('头条消耗数据拉取,结束')
- def old_cost_hourly():
- st = et = du.getNow()
- logging.info('消耗数据拉取,开始')
- run(st, et)
- logging.info('消耗数据拉取,结束')
- def old_cost_daily():
- st = du.get_n_days(-10)
- et = du.get_n_days(-1)
- run(st, et)
- if __name__ == '__main__':
- # run()
- # old_cost_daily()
- st = du.get_n_days(-30)
- et = du.get_n_days(0)
- print(st, et)
- get_daily_tt(st, et)
|