from logging import handlers from model.DateUtils import DateUtils import time import logging from app.api_data.tx_ad_cost.get_cost import ad_cost_day, get_accounts from app.etl.dw import dw_image_cost_day from model.sql_models import DB from config import using_config from model.DataBaseUtils import MysqlUtils import random import pandas import json import requests from concurrent.futures import ThreadPoolExecutor import threading du = DateUtils() db = MysqlUtils() qucheng_db = DB(config=using_config.quchen_text) def ad_cost_day_mp(account_id, access_token, st, et): # 接口文档 https://developers.e.qq.com/docs/api/insights/ad_insights/daily_reports_get?version=1.3 url = 'https://api.e.qq.com/v1.3/daily_reports/get' fields = ('date', 'ad_id', 'adgroup_id', 'cost', 'view_count', 'valid_click_count', 'official_account_follow_count', 'order_count', 'order_amount') li = [] page = 1 total_page = 1 while True: parameters = { 'access_token': access_token, 'timestamp': int(time.time()), 'nonce': str(time.time()) + str(random.randint(0, 999999)), 'fields': fields, "account_id": account_id, "level": 'REPORT_LEVEL_AD_WECHAT', "page": page, "page_size": 1000, "date_range": { "start_date": st, "end_date": et } } for k in parameters: if type(parameters[k]) is not str: parameters[k] = json.dumps(parameters[k]) while True: r = requests.get(url, params=parameters, timeout=5) r = r.json() # import pandas as pd # logging.info(pd.DataFrame(r['data']['list'])) code = r['code'] if code == 11017: time.sleep(61) else: break if r.get("data"): for i in r['data']['list']: if i['cost'] > 0: li.append( ( i['date'], i['ad_id'], i['adgroup_id'], i['cost'] / 100, i['view_count'], i['valid_click_count'], i['official_account_follow_count'], i['order_count'], i['order_amount'] / 100, account_id, 'MP' ) ) # print(r) total_page = r['data']['page_info']['total_page'] if page >= total_page: break else: page += 1 # logging.info(li) # exit() if len(li) > 0: # TODO:询问一下adgroup_id,campaign_id作用 # 对一下ad的数据 li_df = pandas.DataFrame(li) li_df_g = li_df.groupby([0, 1, 9, 10]) li_new = [] adgroup_id_dict = {} for index, group in li_df_g: adgroup_id_dict[index] = ','.join([str(i) for i in group[2].tolist()]) for index, row in li_df_g.agg('sum').iterrows(): new_row = row.tolist() new_row = list(index[0:2]) + new_row + list(index[2:]) new_row[2] = adgroup_id_dict[index] li_new.append(tuple(new_row)) logging.info(f"{account_id} have ad cost :{len(li_new)} ") # db.quchen_text.executeMany('replace into ad_cost_day values(%s,%s,%s,%s,%s,' # '%s,%s,%s,%s,%s,%s)', li_new) qc_session = qucheng_db.DBSession() for _ in li_new: qc_session.execute( '''replace into ad_cost_day values('{}',{},'{}',{},{},{},{},{},{},'{}','{}')'''.format(*_)) qc_session.commit() def get_ad_data(): max_workers = 100 executor = ThreadPoolExecutor(max_workers=max_workers) st = du.get_n_days(0) et = du.get_n_days(0) sql = '''select distinct(account_id) as d_account_id from ad_cost_day acd where dt='2021-08-26' order by cost desc limit 800''' accounts_use = db.quchen_text.getData(sql) accounts_use_set = set() for accounts in accounts_use: accounts_use_set.add(accounts[0]) thread_list = [] for account in get_accounts(): if account[0] in accounts_use_set: # ad_cost_day_mp(account[0], account[1], st, et) # one = threading.Thread(target=ad_cost_day_mp, args=(account[0], account[1], st, et)) # one.start() # thread_list.append(one) thread_tmp=executor.submit(ad_cost_day_mp, account[0], account[1], st, et) thread_list.append(thread_tmp) for _ in thread_list: while True: if _.done(): break time.sleep(0.1) def get_data(): while True: try: # 1.获取数据 # 2.dw_image_cost进行数据更新 # 3.休眠 logging.info('获取开始') # ad_cost_day(du.get_n_days(0), du.get_n_days(0)) get_ad_data() logging.info('获取数据,结束') logging.info('dw_image_cost 进行数据更新') global db db.close() db = MysqlUtils() dw_image_cost_day.run(du.get_n_days(0)) logging.info('dw_image_cost 进行数据更新,结束') time.sleep(60*3) except Exception as e: print('error',e) time.sleep(60) if __name__ == '__main__': logging.basicConfig( handlers=[ logging.handlers.RotatingFileHandler('./get_ad_cost_min.log', maxBytes=10 * 1024 * 1024, backupCount=5, encoding='utf-8') , logging.StreamHandler() # 供输出使用 ], level=logging.INFO, format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s" ) get_data()