Explorar o código

MOD:添加 分钟级消耗数据获取

cxyu %!s(int64=3) %!d(string=hai) anos
pai
achega
e5134ce6a6
Modificáronse 1 ficheiros con 175 adicións e 0 borrados
  1. 175 0
      example/get_ad_cost_min.py

+ 175 - 0
example/get_ad_cost_min.py

@@ -0,0 +1,175 @@
+from logging import handlers
+from model.DateUtils import DateUtils
+import time
+import logging
+from app.api_data.tx_ad_cost.get_cost import ad_cost_day, get_accounts
+from app.etl.dw import dw_image_cost_day
+from model.sql_models import DB
+from config import using_config
+from model.DataBaseUtils import MysqlUtils
+import random
+import pandas
+import json
+import requests
+from concurrent.futures import ThreadPoolExecutor
+import threading
+
+du = DateUtils()
+db = MysqlUtils()
+qucheng_db = DB(config=using_config.quchen_text)
+
+
+def ad_cost_day_mp(account_id, access_token, st, et):
+    # 接口文档 https://developers.e.qq.com/docs/api/insights/ad_insights/daily_reports_get?version=1.3
+    url = 'https://api.e.qq.com/v1.3/daily_reports/get'
+    fields = ('date', 'ad_id', 'adgroup_id', 'cost', 'view_count', 'valid_click_count', 'official_account_follow_count',
+              'order_count', 'order_amount')
+    li = []
+    page = 1
+    total_page = 1
+    while True:
+        parameters = {
+            'access_token': access_token,
+            'timestamp': int(time.time()),
+            'nonce': str(time.time()) + str(random.randint(0, 999999)),
+            'fields': fields,
+            "account_id": account_id,
+            "level": 'REPORT_LEVEL_AD_WECHAT',
+            "page": page,
+            "page_size": 1000,
+            "date_range": {
+                "start_date": st,
+                "end_date": et
+            }
+        }
+
+        for k in parameters:
+            if type(parameters[k]) is not str:
+                parameters[k] = json.dumps(parameters[k])
+
+        while True:
+            r = requests.get(url, params=parameters, timeout=5)
+            r = r.json()
+            # import pandas as pd
+            # logging.info(pd.DataFrame(r['data']['list']))
+
+            code = r['code']
+            if code == 11017:
+                time.sleep(61)
+            else:
+                break
+
+        if r.get("data"):
+            for i in r['data']['list']:
+                if i['cost'] > 0:
+                    li.append(
+                        (
+                            i['date'], i['ad_id'], i['adgroup_id'],
+                            i['cost'] / 100, i['view_count'],
+                            i['valid_click_count'],
+                            i['official_account_follow_count'],
+                            i['order_count'], i['order_amount'] / 100, account_id,
+                            'MP'
+                        )
+                    )
+
+            # print(r)
+            total_page = r['data']['page_info']['total_page']
+        if page >= total_page:
+            break
+        else:
+            page += 1
+    # logging.info(li)
+    # exit()
+    if len(li) > 0:
+        # TODO:询问一下adgroup_id,campaign_id作用
+        # 对一下ad的数据
+        li_df = pandas.DataFrame(li)
+        li_df_g = li_df.groupby([0, 1, 9, 10])
+        li_new = []
+        adgroup_id_dict = {}
+        for index, group in li_df_g:
+            adgroup_id_dict[index] = ','.join([str(i) for i in group[2].tolist()])
+        for index, row in li_df_g.agg('sum').iterrows():
+            new_row = row.tolist()
+            new_row = list(index[0:2]) + new_row + list(index[2:])
+            new_row[2] = adgroup_id_dict[index]
+            li_new.append(tuple(new_row))
+        logging.info(f"{account_id} have ad cost :{len(li_new)} ")
+        # db.quchen_text.executeMany('replace into ad_cost_day values(%s,%s,%s,%s,%s,'
+        #                            '%s,%s,%s,%s,%s,%s)', li_new)
+        qc_session = qucheng_db.DBSession()
+        for _ in li_new:
+            qc_session.execute(
+                '''replace into ad_cost_day values('{}',{},'{}',{},{},{},{},{},{},'{}','{}')'''.format(*_))
+            qc_session.commit()
+
+
+def get_ad_data():
+    max_workers = 100
+    executor = ThreadPoolExecutor(max_workers=max_workers)
+    st = du.get_n_days(0)
+    et = du.get_n_days(0)
+    sql = '''select distinct(account_id) as d_account_id from ad_cost_day acd 
+            where dt='2021-08-26'
+            order by cost desc
+            limit 800'''
+    accounts_use = db.quchen_text.getData(sql)
+    accounts_use_set = set()
+
+    for accounts in accounts_use:
+        accounts_use_set.add(accounts[0])
+
+    thread_list = []
+    for account in get_accounts():
+        if account[0] in accounts_use_set:
+            # ad_cost_day_mp(account[0], account[1], st, et)
+            # one = threading.Thread(target=ad_cost_day_mp, args=(account[0], account[1], st, et))
+            # one.start()
+            # thread_list.append(one)
+            thread_tmp=executor.submit(ad_cost_day_mp, account[0], account[1], st, et)
+            thread_list.append(thread_tmp)
+
+    for _ in thread_list:
+        while True:
+            if _.done():
+                break
+            time.sleep(0.1)
+
+def get_data():
+    while True:
+        try:
+            # 1.获取数据
+            # 2.dw_image_cost进行数据更新
+            # 3.休眠
+            logging.info('获取开始')
+            # ad_cost_day(du.get_n_days(0), du.get_n_days(0))
+            get_ad_data()
+            logging.info('获取数据,结束')
+            logging.info('dw_image_cost 进行数据更新')
+
+            dw_image_cost_day.run(du.get_n_days(0))
+            logging.info('dw_image_cost 进行数据更新,结束')
+            time.sleep(60*3)
+            global db
+            db.close()
+            db = MysqlUtils()
+        except Exception as e:
+            raise
+            print(e)
+            time.sleep(60)
+
+
+if __name__ == '__main__':
+    logging.basicConfig(
+        handlers=[
+            logging.handlers.RotatingFileHandler('./get_ad_cost_min.log',
+                                                 maxBytes=10 * 1024 * 1024,
+                                                 backupCount=5,
+                                                 encoding='utf-8')
+            , logging.StreamHandler()  # 供输出使用
+        ],
+        level=logging.INFO,
+        format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s"
+    )
+    get_data()