123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- from logging import handlers
- from model.DateUtils import DateUtils
- import time
- import logging
- from app.api_data.tx_ad_cost.get_cost import ad_cost_day, get_accounts
- from app.etl.dw import dw_image_cost_day
- from model.sql_models import DB
- from config import using_config
- from model.DataBaseUtils import MysqlUtils
- import random
- import pandas
- import json
- import requests
- from concurrent.futures import ThreadPoolExecutor
- import threading
- du = DateUtils()
- db = MysqlUtils()
- qucheng_db = DB(config=using_config.quchen_text)
- def ad_cost_day_mp(account_id, access_token, st, et):
- # 接口文档 https://developers.e.qq.com/docs/api/insights/ad_insights/daily_reports_get?version=1.3
- url = 'https://api.e.qq.com/v1.3/daily_reports/get'
- fields = ('date', 'ad_id', 'adgroup_id', 'cost', 'view_count', 'valid_click_count', 'official_account_follow_count',
- 'order_count', 'order_amount')
- li = []
- page = 1
- total_page = 1
- while True:
- parameters = {
- 'access_token': access_token,
- 'timestamp': int(time.time()),
- 'nonce': str(time.time()) + str(random.randint(0, 999999)),
- 'fields': fields,
- "account_id": account_id,
- "level": 'REPORT_LEVEL_AD_WECHAT',
- "page": page,
- "page_size": 1000,
- "date_range": {
- "start_date": st,
- "end_date": et
- }
- }
- for k in parameters:
- if type(parameters[k]) is not str:
- parameters[k] = json.dumps(parameters[k])
- while True:
- r = requests.get(url, params=parameters, timeout=5)
- r = r.json()
- # import pandas as pd
- # logging.info(pd.DataFrame(r['data']['list']))
- code = r['code']
- if code == 11017:
- time.sleep(61)
- else:
- break
- if r.get("data"):
- for i in r['data']['list']:
- if i['cost'] > 0:
- li.append(
- (
- i['date'], i['ad_id'], i['adgroup_id'],
- i['cost'] / 100, i['view_count'],
- i['valid_click_count'],
- i['official_account_follow_count'],
- i['order_count'], i['order_amount'] / 100, account_id,
- 'MP'
- )
- )
- # print(r)
- total_page = r['data']['page_info']['total_page']
- if page >= total_page:
- break
- else:
- page += 1
- # logging.info(li)
- # exit()
- if len(li) > 0:
- # TODO:询问一下adgroup_id,campaign_id作用
- # 对一下ad的数据
- li_df = pandas.DataFrame(li)
- li_df_g = li_df.groupby([0, 1, 9, 10])
- li_new = []
- adgroup_id_dict = {}
- for index, group in li_df_g:
- adgroup_id_dict[index] = ','.join([str(i) for i in group[2].tolist()])
- for index, row in li_df_g.agg('sum').iterrows():
- new_row = row.tolist()
- new_row = list(index[0:2]) + new_row + list(index[2:])
- new_row[2] = adgroup_id_dict[index]
- li_new.append(tuple(new_row))
- logging.info(f"{account_id} have ad cost :{len(li_new)} ")
- # db.quchen_text.executeMany('replace into ad_cost_day values(%s,%s,%s,%s,%s,'
- # '%s,%s,%s,%s,%s,%s)', li_new)
- qc_session = qucheng_db.DBSession()
- for _ in li_new:
- qc_session.execute(
- '''replace into ad_cost_day values('{}',{},'{}',{},{},{},{},{},{},'{}','{}')'''.format(*_))
- qc_session.commit()
- def get_ad_data():
- max_workers = 100
- executor = ThreadPoolExecutor(max_workers=max_workers)
- st = du.get_n_days(0)
- et = du.get_n_days(0)
- sql = '''select distinct(account_id) as d_account_id from ad_cost_day acd
- where dt='2021-08-26'
- order by cost desc
- limit 800'''
- accounts_use = db.quchen_text.getData(sql)
- accounts_use_set = set()
- for accounts in accounts_use:
- accounts_use_set.add(accounts[0])
- thread_list = []
- for account in get_accounts():
- if account[0] in accounts_use_set:
- # ad_cost_day_mp(account[0], account[1], st, et)
- # one = threading.Thread(target=ad_cost_day_mp, args=(account[0], account[1], st, et))
- # one.start()
- # thread_list.append(one)
- thread_tmp=executor.submit(ad_cost_day_mp, account[0], account[1], st, et)
- thread_list.append(thread_tmp)
- for _ in thread_list:
- while True:
- if _.done():
- break
- time.sleep(0.1)
- def get_data():
- while True:
- try:
- # 1.获取数据
- # 2.dw_image_cost进行数据更新
- # 3.休眠
- logging.info('获取开始')
- # ad_cost_day(du.get_n_days(0), du.get_n_days(0))
- get_ad_data()
- logging.info('获取数据,结束')
- logging.info('dw_image_cost 进行数据更新')
- dw_image_cost_day.run(du.get_n_days(0))
- logging.info('dw_image_cost 进行数据更新,结束')
- time.sleep(60*3)
- global db
- db.close()
- db = MysqlUtils()
- except Exception as e:
- raise
- print(e)
- time.sleep(60)
- if __name__ == '__main__':
- logging.basicConfig(
- handlers=[
- logging.handlers.RotatingFileHandler('./get_ad_cost_min.log',
- maxBytes=10 * 1024 * 1024,
- backupCount=5,
- encoding='utf-8')
- , logging.StreamHandler() # 供输出使用
- ],
- level=logging.INFO,
- format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s"
- )
- get_data()
|