import requests import hashlib import time import json import logging import random from concurrent.futures import ThreadPoolExecutor from model.DateUtils import DateUtils from model.DataBaseUtils import MysqlUtils from model.DingTalkUtils import DingTalkUtils from app.api_data.tx_ad_cost.get_cost_game import get_data from six import string_types from six.moves.urllib.parse import urlencode, urlunparse db = MysqlUtils() du = DateUtils() def md5value(s): md5 = hashlib.md5() md5.update(s.encode("utf-8")) return md5.hexdigest() def daily_reports_get(access_token, account_id, st, et, level, fields, err_num=0): logging.info(f'开始获取消耗数据,token:{access_token}, id:{account_id}, st:{str(st)}, et:{str(et)}') interface = 'daily_reports/get' url = 'https://api.e.qq.com/v1.3/' + interface common_parameters = { 'access_token': access_token, 'timestamp': int(time.time()), 'nonce': str(time.time()) + str(random.randint(0, 999999)), } parameters = { "account_id": account_id, "level": level, "date_range": { "start_date": st, "end_date": et }, "page": 1, "page_size": 1000, "fields": fields } parameters.update(common_parameters) for k in parameters: if type(parameters[k]) is not str: parameters[k] = json.dumps(parameters[k]) r = requests.get(url, params=parameters, timeout=5).json() logging.info('account_id: {} 开始获取消耗数据'.format(account_id)) if r['code'] != 0: logging.warning( 'access_token:{} code:{} message:{}'.format(str(access_token), str(r['code']), str(r['message']))) if err_num < 5: time.sleep(0.1) return daily_reports_get(access_token, account_id, st, et, level, fields, err_num=err_num + 1) DingTalkUtils().send( '消耗日报请求出现问题\naccess_token:{} code:{} message:{}'.format(str(access_token), str(r['code']), str(r['message']))) return r def get_q_data(y, li, st, et): try: c = daily_reports_get(y[2], y[0], st, et, "REPORT_LEVEL_ADVERTISER", ( 'date', 'view_count', 'valid_click_count', 'ctr', 'cpc', 'cost', 'web_order_count', 'web_order_rate', 'web_order_cost', 'follow_count', 'order_amount', 'order_roi', 'platform_page_view_count', 'web_commodity_page_view_count', 'from_follow_uv')) if 'data' in c.keys() and len(c["data"]["list"]) > 0: for d in c['data']['list']: d['account_id'] = y[0] logging.info('qq: ' + str(d['account_id']) + str(d["cost"])) x = d res_data = [x['date'], x['view_count'], x['valid_click_count'], x['ctr'], x['cpc'], x['cost'], x['web_order_count'], x['web_order_rate'], x['web_order_cost'], x['follow_count'], x['order_amount'], x['order_roi'], x['platform_page_view_count'], x['web_commodity_page_view_count'], x['from_follow_uv'], x['account_id']] li.append(tuple(res_data)) except Exception as e: logging.error('qq account:{} error :{}'.format(str(y), str(e))) def get_v_data(y, li, st, et): try: c = daily_reports_get(y[2], y[0], st, et, "REPORT_LEVEL_ADVERTISER_WECHAT", ( 'date', 'cost', 'view_count', 'valid_click_count', 'ctr', 'official_account_follow_rate', 'order_amount', 'order_roi', 'order_count', 'order_rate', 'order_unit_price', 'web_order_cost', 'first_day_order_amount', 'first_day_order_count')) if 'data' in c.keys() and len(c["data"]["list"]) > 0: for d in c['data']['list']: d['account_id'] = y[0] logging.info('vx:' + str(d['account_id']) + ' ' + str(d["cost"])) x = d res_data = [x['date'], x['cost'], x['view_count'], x['valid_click_count'], x['ctr'], x['official_account_follow_rate'], x['order_amount'], x['order_roi'], x['order_count'], x['order_rate'], x['order_unit_price'], x['web_order_cost'], x['first_day_order_amount'], x['first_day_order_count'], x['account_id']] li.append(tuple(res_data)) except Exception as e: logging.error('vx account:{} error :{}'.format(str(y), str(e))) def get_vx_game_list(): sql = '''select account_id,wechat_account_id,access_token,refresh_token,name, ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from advertiser_vx where access_token is not null and is_game=1 ''' a = db.quchen_text.getData(sql) return a def get_vx_list(): sql = '''select account_id,wechat_account_id,access_token,refresh_token,name, ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from advertiser_vx where access_token is not null and is_game=0 ''' a = db.quchen_text.getData(sql) return a def get_qq_list(): sql = "select account_id,'',access_token,refresh_token,name," \ "ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from advertiser_qq" a = db.quchen_text.getData(sql) return a def mysql_insert_daily_vx(data): logging.info('start save daily_vx info') b = """replace into daily_vx (date,cost,view_count,valid_click_count,ctr,official_account_follow_rate,order_amount, order_roi,order_count,order_rate,order_unit_price,web_order_cost,first_day_order_amount,first_day_order_count,account_id) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""" db.quchen_text.executeMany(b, data) logging.info('start save daily_vx info') def mysql_insert_daily_qq(data): a = """replace into daily_qq (date,view_count,valid_click_count,ctr,cpc,cost,web_order_count,web_order_rate, web_order_cost,follow_count,order_amount,order_roi,platform_page_view_count,web_commodity_page_view_count, from_follow_uv,account_id) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""" db.quchen_text.executeMany(a, data) def get_daily_vx(st, et): token_list_v = get_vx_list() logging.info("获取vx账号:" + str(token_list_v.__len__())) time1 = time.time() executor = ThreadPoolExecutor(max_workers=10) li = [] for y in token_list_v: executor.submit(get_v_data, y, li, st, et) executor.shutdown() logging.info('get_daily_vx:' + str(len(li)) + 'cost:' + str(int(time.time() - time1))) mysql_insert_daily_vx(li) def get_daily_vx_game(st, et): token_list_v = get_vx_game_list() logging.info("获取vx_game账号:" + str(token_list_v.__len__())) time1 = time.time() executor = ThreadPoolExecutor(max_workers=10) li = [] for y in token_list_v: executor.submit(get_data, y, li, st, et) executor.shutdown() logging.info('get_daily_vx:' + str(len(li)) + 'cost:' + str(int(time.time() - time1))) mysql_insert_daily_vx(li) def get_daily_qq(st, et): token_list_q = get_qq_list() logging.info("获取qq账号:" + str(token_list_q.__len__())) time1 = time.time() li = [] executor = ThreadPoolExecutor(max_workers=10) for x in token_list_q: executor.submit(get_q_data, x, li, st, et) executor.shutdown() logging.info('get_qq_order:' + str(len(li)) + 'cost:' + str(int(time.time() - time1))) mysql_insert_daily_qq(li) def run(st, et): logging.info('微信消耗数据拉取,开始') get_daily_vx(st, et) logging.info('微信消耗数据拉取,结束') logging.info('微信游戏消耗数据拉取,结束') get_daily_vx_game(st, et) logging.info('微信游戏消耗数据拉取,结束') # logging.info('qq消耗数据拉取,开始') # get_daily_qq(st, et) # logging.info('qq消耗数据拉取,结束') def old_cost_hourly(): st = et = du.getNow() logging.info('消耗数据拉取,开始') run(st, et) logging.info('消耗数据拉取,结束') def old_cost_daily(): st = du.get_n_days(-10) et = du.get_n_days(-1) run(st, et) if __name__ == '__main__': # run() # old_cost_daily() st = du.get_n_days(-10) et = du.get_n_days(0) run(st, et)