| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330 | import requestsimport hashlibimport timeimport jsonimport loggingimport randomfrom concurrent.futures import ThreadPoolExecutorfrom model.DateUtils import DateUtilsfrom model.DataBaseUtils import MysqlUtilsfrom model.DingTalkUtils import DingTalkUtilsfrom six import string_typesfrom six.moves.urllib.parse import urlencode, urlunparsedb = MysqlUtils()du = DateUtils()def md5value(s):    md5 = hashlib.md5()    md5.update(s.encode("utf-8"))    return md5.hexdigest()def daily_reports_get(access_token, account_id, st, et, level, fields, err_num=0):    logging.info(f'开始获取消耗数据,token:{access_token}, id:{account_id}, st:{str(st)}, et:{str(et)}')    interface = 'daily_reports/get'    url = 'https://api.e.qq.com/v1.1/' + interface    common_parameters = {        'access_token': access_token,        'timestamp': int(time.time()),        'nonce': str(time.time()) + str(random.randint(0, 999999)),    }    parameters = {        "account_id": account_id,        "level": level,        "date_range":            {                "start_date": st,                "end_date": et            },        "page": 1,        "page_size": 1000,        "fields": fields    }    parameters.update(common_parameters)    for k in parameters:        if type(parameters[k]) is not str:            parameters[k] = json.dumps(parameters[k])    r = requests.get(url, params=parameters, timeout=5).json()    logging.info('account_id: {} 开始获取消耗数据'.format(account_id))    if r['code'] != 0:        logging.warning(            'access_token:{} code:{} message:{}'.format(str(access_token), str(r['code']), str(r['message'])))        if err_num < 5:            time.sleep(0.1)            return daily_reports_get(access_token, account_id, st, et, level, fields, err_num=err_num + 1)        DingTalkUtils().send(            '消耗日报请求出现问题\naccess_token:{} code:{} message:{}'.format(str(access_token), str(r['code']),                                                                    str(r['message'])))    return rdef get_q_data(y, li, st, et):    try:        c = daily_reports_get(y[2], y[0], st, et, "REPORT_LEVEL_ADVERTISER", (            'date', 'view_count', 'valid_click_count', 'ctr', 'cpc', 'cost', 'web_order_count', 'web_order_rate',            'web_order_cost', 'follow_count', 'order_amount', 'order_roi', 'platform_page_view_count',            'web_commodity_page_view_count', 'from_follow_uv'))        if 'data' in c.keys() and len(c["data"]["list"]) > 0:            for d in c['data']['list']:                d['account_id'] = y[0]                logging.info('qq: ' + str(d['account_id']) + str(d["cost"]))                x = d                res_data=[x['date'],x['view_count'],x['valid_click_count'],x['ctr'],x['cpc'],x['cost'],                          x['web_order_count'],x['web_order_rate'],                          x['web_order_cost'],x['follow_count'],x['order_amount'],x['order_roi'],                          x['platform_page_view_count'],x['web_commodity_page_view_count'],                          x['from_follow_uv'],x['account_id']]                li.append(tuple(res_data))    except Exception as e:        logging.error('qq account:{}   error :{}'.format(str(y),str(e)))def get_v_data(y, li, st, et):    try:        c = daily_reports_get(y[2], y[0], st, et, "REPORT_LEVEL_ADVERTISER_WECHAT", (            'date', 'cost', 'view_count', 'valid_click_count', 'ctr', 'official_account_follow_rate', 'order_amount',            'order_roi', 'order_count', 'order_rate', 'order_unit_price', 'web_order_cost', 'first_day_order_amount',            'first_day_order_count'))        if 'data' in c.keys() and len(c["data"]["list"]) > 0:            for d in c['data']['list']:                d['account_id'] = y[0]                logging.info('vx:' + str(d['account_id'])+ ' ' + str(d["cost"]))                x = d                res_data=[x['date'],x['cost'],x['view_count'],x['valid_click_count'],x['ctr'],                          x['official_account_follow_rate'],                          x['order_amount'],x['order_roi'],x['order_count'],x['order_rate'],                          x['order_unit_price'],x['web_order_cost'],x['first_day_order_amount'],                          x['first_day_order_count'],x['account_id']]                li.append(tuple(res_data))    except Exception as e:        logging.error('vx account:{}   error :{}'.format(str(y),str(e)))def get_tt_data(account_info, li, st, et):    def build_url_ad(path, query=""):        # type: (str, str) -> str        """        Build request URL        :param path: Request path        :param query: Querystring        :return: Request URL        """        scheme, netloc = "https", "ad.oceanengine.com"        return urlunparse((scheme, netloc, path, "", query, ""))    page_num = 1    advertiser_ids = account_info[1]    for advertiser_id in advertiser_ids:        while True:            # account_info            my_args = {                "start_date": st,                "end_date": et,                "page_size": 100,                "page": page_num,                # "agent_id" : "1708974248093789",                'advertiser_id': advertiser_id,                # "start_date": "%s"            }            PATH = "/open_api/2/report/advertiser/get/"            args = json.loads(json.dumps(my_args))            query_string = urlencode({k: v if isinstance(v, string_types) else json.dumps(v) for k, v in args.items()})            url = build_url_ad(PATH, query_string)            headers = {                "Access-Token": account_info[0],            }            rsp = requests.get(url, headers=headers)            '''            date,cost,view_count,valid_click_count,        ctr,official_account_follow_rate,order_amount,        order_roi,order_count,order_rate,order_unit_price,        web_order_cost,first_day_order_amount,first_day_order_count,account_id            '''            # print(account_info)            # print(rsp.text)            result = rsp.json()            for _ in result['data']['list']:                campaign_info = (_['stat_datetime'][:10], _['cost'] * 100, _['show'], _['click'],                                 _['ctr'], None, None,                                 None, None, None, None,                                 None, None, None, advertiser_id)                li.append(campaign_info)            total_page = result['data']['page_info']['total_page']            if page_num > total_page or page_num == total_page:                break            else:                page_num = page_num + 1def get_vx_list():    sql = "select account_id,wechat_account_id,access_token,refresh_token,name," \          "ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from advertiser_vx"    a = db.quchen_text.getData(sql)    return adef get_qq_list():    sql = "select account_id,'',access_token,refresh_token,name," \          "ifnull(stage,''),ifnull(pitcher,''),ifnull(platform,''),ifnull(book,'') from advertiser_qq"    a = db.quchen_text.getData(sql)    return adef mysql_insert_daily_vx(data):    logging.info('start save daily_vx info')    b = """replace into daily_vx (date,cost,view_count,valid_click_count,ctr,official_account_follow_rate,order_amount,	order_roi,order_count,order_rate,order_unit_price,web_order_cost,first_day_order_amount,first_day_order_count,account_id)	 values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""    db.quchen_text.executeMany(b, data)    logging.info('start save daily_vx info')def mysql_insert_daily_qq(data):    a = """replace into daily_qq (date,view_count,valid_click_count,ctr,cpc,cost,web_order_count,web_order_rate,	web_order_cost,follow_count,order_amount,order_roi,platform_page_view_count,web_commodity_page_view_count,	from_follow_uv,account_id) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""    db.quchen_text.executeMany(a, data)def mysql_insert_daily_tt(data):    b = """replace into daily_tt (date,cost,view_count,valid_click_count,ctr,    official_account_follow_rate,order_amount,	order_roi,order_count,order_rate,order_unit_price,	web_order_cost,first_day_order_amount,	first_day_order_count,account_id)	 values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""    db.quchen_text.executeMany(b, data)def get_daily_vx(st, et):    token_list_v = get_vx_list()    logging.info("获取vx账号:" + str(token_list_v.__len__()))    time1 = time.time()    executor = ThreadPoolExecutor(max_workers=10)    li = []    for y in token_list_v:        executor.submit(get_v_data, y, li, st, et)    executor.shutdown()    logging.info('get_daily_vx:' + str(len(li)) + 'cost:' + str(int(time.time() - time1)))    mysql_insert_daily_vx(li)def get_daily_qq(st, et):    token_list_q = get_qq_list()    logging.info("获取qq账号:" + str(token_list_q.__len__()))    time1 = time.time()    li = []    executor = ThreadPoolExecutor(max_workers=10)    for x in token_list_q:        executor.submit(get_q_data, x, li, st, et)    executor.shutdown()    logging.info('get_qq_order:' + str(len(li)) + 'cost:' + str(int(time.time() - time1)))    mysql_insert_daily_qq(li)def get_daily_tt(st, et):    def refresh_access_token(appid, account_id, secret, refresh_token):        open_api_url_prefix = "https://ad.oceanengine.com/open_api/"        uri = "oauth2/refresh_token/"        refresh_token_url = open_api_url_prefix + uri        data = {            "appid": appid,            "secret": secret,            "grant_type": "refresh_token",            "refresh_token": refresh_token,        }        rsp = requests.post(refresh_token_url, json=data)        rsp_data = rsp.json()        new_refresh_token = rsp_data['data']['refresh_token']        new_access_token = rsp_data['data']['access_token']        sql = f'''        update bytedance_login_info        set refresh_token='{new_refresh_token}' ,access_token='{new_access_token}'        where appid='{appid}' and account_id='{account_id}'        '''        db.quchen_text.execute(sql)        return rsp_data['data']['access_token']    # 1.获取refresh_token    sql = '''    select appid,account_id,secret,refresh_token from bytedance_login_info    '''    accounts_info = db.quchen_text.getData(sql)    # 2.刷新refresh_token,并获取最新的access_token    for account_info in accounts_info:        appid, account_id, secret, refresh_token = account_info        access_token = refresh_access_token(appid, account_id, secret, refresh_token)        # 3.获取agent_id        sql = f'''        select distinct(advertiser_id) from advertiser_bytedance        where appid='{appid}' and account_id='{account_id}'        '''        advertiser_ids = db.quchen_text.getData(sql)        logging.info("获取头条账号:" + str(advertiser_ids.__len__()))        advertiser_ids = [_[0] for _ in advertiser_ids]        # token,adv_ids        account_info = (access_token, advertiser_ids)        time1 = time.time()        li = []        get_tt_data(account_info, li, st, et)        logging.info('get_tt_order:' + str(len(li)) + 'cost:' + str(int(time.time() - time1)))        mysql_insert_daily_tt(li)def get_token_bytedance():    #添加bytedance账号,需要添加一下access_token    open_api_url_prefix = "https://ad.oceanengine.com/open_api/"    uri = "oauth2/access_token/"    url = open_api_url_prefix + uri    data = {        "app_id": 1709866698360883,        "secret": "****",        "grant_type": "auth_code",        "auth_code": "********"    }    rsp = requests.post(url, json=data)    rsp_data = rsp.json()    return rsp_data['data']['access_token']def run(st, et):    logging.info('微信消耗数据拉取,开始')    get_daily_vx(st, et)    logging.info('微信消耗数据拉取,结束')    logging.info('qq消耗数据拉取,开始')    get_daily_qq(st, et)    logging.info('qq消耗数据拉取,结束')    logging.info('头条消耗数据拉取,开始')    get_daily_tt(st, et)    logging.info('头条消耗数据拉取,结束')def old_cost_hourly():    st = et = du.getNow()    logging.info('消耗数据拉取,开始')    run(st, et)    logging.info('消耗数据拉取,结束')def old_cost_daily():    st = du.get_n_days(-10)    et = du.get_n_days(-1)    run(st, et)if __name__ == '__main__':    # run()    # old_cost_daily()    st = du.get_n_days(-30)    et = du.get_n_days(0)    print(st, et)    get_daily_tt(st, et)
 |