ck 4 vuotta sitten
vanhempi
commit
f4a0fd9a9a

+ 516 - 0
app/api_data/cost_util.py

@@ -0,0 +1,516 @@
+
+import json
+import random
+import requests
+import time
+from datetime import datetime
+import pymysql
+import logging
+from concurrent.futures import ThreadPoolExecutor
+from model.DataBaseUtils import MysqlUtils
+logging.getLogger().setLevel(logging.WARNING)
+from model.ComUtils import *
+from model.DateUtils import DateUtils
+du = DateUtils()
+db = MysqlUtils()
+max_workers = 10
+count = []
+t = du.get_n_days(-10)
+
+def get_adcreatives(account_id,access_token,flag):  # 获取创意
+    url = 'https://api.e.qq.com/v1.1/adcreatives/get'
+    li =[]
+    page = 1
+
+    while True:
+        parameters = {
+            'access_token': access_token,
+            'timestamp': int(time.time()),
+            'nonce': str(time.time()) + str(random.randint(0, 999999)),
+            'fields': ('campaign_id', 'adcreative_id', 'adcreative_name', 'adcreative_elements', 'promoted_object_type', 'page_type',
+            'page_spec', 'link_page_spec', 'universal_link_url', 'promoted_object_id','site_set'),
+            "filtering": [{
+                "field": "created_time",
+                "operator": "GREATER_EQUALS",
+                "values":
+                    [
+                        DateUtils.str_to_stamp(t)
+                    ]}],
+            "account_id": account_id,
+            "page": page,
+            "page_size": 100,
+            "is_deleted": False
+        }
+
+        for k in parameters:
+            if type(parameters[k]) is not str:
+                parameters[k] = json.dumps(parameters[k])
+
+        while True:
+            h = requests.get(url, params=parameters)
+            if h.status_code == 200:
+                r = h.json()
+                # print(r)
+
+                break
+            else:
+                time.sleep(1)
+                print("爬取失败 等待1s")
+
+
+        if 'data' in r.keys():
+
+            for i in r['data']['list']:
+                # print(i)
+
+                if flag=='MP':
+                    if len(i['adcreative_elements'])>0:
+                        d = i['adcreative_elements']
+                        title =d.get('title','')
+                        if 'image' in d.keys():
+                            image=d.get('image','')
+                        elif 'image_list' in d.keys():
+                            image =','.join(d.get('image_list'))
+                        else:
+                            image=''
+
+                    else:
+                        title = image=''
+
+
+                    li.append((
+                        i['adcreative_id'],i['adcreative_name'],i['campaign_id'],image,title,
+                        i.get('promoted_object_type',''),i.get('page_type',''),
+                        i['page_spec'].get('page_id',''),i.get('promoted_object_id',''),
+                        '','','MP'
+                    ))
+                else:
+                    if len(i['adcreative_elements'])>0:
+                        d =i['adcreative_elements']
+                        if 'image' in d.keys():
+                            image =d['image']
+                        elif 'element_story' in d.keys():
+                            image= ','.join([x['image'] for x in  d['element_story']])
+                        else:
+                            image=''
+                        title =d.get('title','')
+                        description = d.get('description','')
+
+
+                    else:
+                        image=title=description=''
+
+
+                    li.append(
+                        (
+                            i['adcreative_id'], i['adcreative_name'], i['campaign_id'],image,title,
+                            i.get('promoted_object_type', ''), i.get('page_type', ''),
+                            i['page_spec'].get('page_id', ''), i.get('promoted_object_id', ''),
+                            ','.join(i['site_set']),description,'GDT'
+
+                        )
+                    )
+
+
+
+
+
+            total_page = r['data']['page_info']['total_page']
+            if total_page > page:
+                page += 1
+            else:
+                break
+        else:
+            break
+    if len(li)>0:
+        print(f"{account_id}有创意:",len(li))
+        sql='replace into adcreative_info values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) '
+        db.quchen_text.executeMany(sql,li)
+
+
+
+def images_info_get(account_id,access_token):  # 获取图片信息
+
+    fields = ('image_id','width','height','file_size','signature','preview_url')
+    interface = 'images/get'
+    url = 'https://api.e.qq.com/v1.3/' + interface
+
+    page = 1
+    li = []
+
+
+
+    while True:
+
+        common_parameters = {
+            'access_token': access_token,
+            'timestamp': int(time.time()),
+            'nonce': str(time.time()) + str(random.randint(0, 999999)),
+            'fields': fields
+        }
+
+        parameters = {
+            "account_id": account_id,
+            "filtering":[{
+                    "field": "created_time",
+                    "operator": "GREATER_EQUALS",
+                    "values":
+                    [
+                        DateUtils.str_to_stamp(t)
+                    ]}],
+            "page": page,
+            "page_size": 100
+        }
+
+        parameters.update(common_parameters)
+        for k in parameters:
+            if type(parameters[k]) is not str:
+                parameters[k] = json.dumps(parameters[k])
+
+        while True:
+            h = requests.get(url, params=parameters)
+            # print(h.text)
+            if h.status_code == 200:
+                r = h.json()
+                break
+            else:
+                time.sleep(1)
+                print("请求出错 等待1s..")
+
+        if 'data' in r.keys():
+            li.extend(r['data']['list'])
+
+
+        total_page = r['data']['page_info']['total_page']
+        if total_page > page:
+            page += 1
+        else:
+            break
+    # print(li)
+    data = []
+    for i in li:
+        data.append((i['image_id'],i['width'],i['height'],i['signature'],i['preview_url']))
+    # print(data)
+    print(f"{account_id} 有图片:", li.__len__())
+    if li.__len__() > 0:
+        sql="replace into image_info value (%s,%s,%s,%s,%s)"
+        db.quchen_text.executeMany(sql, data)
+        db.close()
+
+
+
+def ad_info():
+    accounts = db.quchen_text.getData("""
+    select account_id,access_token,name channel,'GDT' type  from advertiser_qq where name !='' or name is not null 
+     union 
+     select account_id,access_token,name channel,'MP' type from advertiser_vx where name !='' or name is not null 
+     
+     """)
+    total_data =[]
+    executor = ThreadPoolExecutor(max_workers=max_workers)
+    for i in accounts:
+        # print(i)
+        account_id =i[0]
+        access_token = i[1]
+        type = i[3]
+        executor.submit(get_ad_info,account_id, access_token,type,total_data)
+    executor.shutdown()
+
+    print(len(total_data))
+    if len(total_data)>0:
+        sql="replace into ad_info values(%s,%s,%s,%s,%s,%s,%s) "
+        db.quchen_text.executeMany(sql, total_data)
+
+
+"""获取广告基础信息"""
+
+
+def get_ad_info(account_id, access_token, flag):
+
+    path = 'ads/get'
+    fields = ('ad_id', 'ad_name', 'adcreative_id', 'adgroup_id', 'campaign_id')
+    url = 'https://api.e.qq.com/v1.3/' + path
+    li = []
+    page = 1
+
+    while True:
+        parameters = {
+            'access_token': access_token,
+            'timestamp': int(time.time()),
+            'nonce': str(time.time()) + str(random.randint(0, 999999)),
+            'fields': fields,
+            "filtering": [{
+                "field": "created_time",
+                "operator": "GREATER_EQUALS",
+                "values":
+                    [
+                        DateUtils.str_to_stamp(t)
+                    ]}],
+            "account_id": account_id,
+            "page": page,
+            "page_size": 100,
+            "is_deleted": False
+        }
+
+        for k in parameters:
+            if type(parameters[k]) is not str:
+                parameters[k] = json.dumps(parameters[k])
+        r = requests.get(url, params=parameters).json()
+
+        # print(r)
+        total_page = r['data']['page_info']['total_page']
+
+        if page > total_page:
+            break
+        else:
+            page += 1
+
+        if r.get("data"):
+            for i in r['data']['list']:
+                li.append((str(i['ad_id']), i['ad_name'], i['adcreative_id'], i['campaign_id'], i['adgroup_id'],
+                           account_id, flag))
+
+
+    if li.__len__()>0:
+        print(f"{account_id}有广告:",li.__len__())
+        sql = "replace into ad_info values(%s,%s,%s,%s,%s,%s,%s) "
+        db.quchen_text.executeMany(sql, li)
+    db.close()
+
+
+def get_ad_cost_day(account_id,access_token,flag,st,et):
+
+    if flag == 'MP':
+        ad_cost_day_mp(account_id,access_token, st, et)
+    else:
+        ad_cost_day_gdt(account_id,access_token, st, et)
+
+
+def ad_cost_day_gdt(account_id,access_token,st,et):
+    url = 'https://api.e.qq.com/v1.3/daily_reports/get'
+    fields = ('date', 'ad_id', 'cost', 'view_count', 'ctr', 'follow_count')
+    li = []
+    page = 1
+    while True:
+        parameters = {
+            'access_token': access_token,
+            'timestamp': int(time.time()),
+            'nonce': str(time.time()) + str(random.randint(0, 999999)),
+            'fields': fields,
+            "account_id": account_id,
+            "group_by" : ['ad_id','date'],
+            "level": 'REPORT_LEVEL_AD',
+            "page": page,
+            "page_size": 1000,
+            "date_range": {
+                "start_date": st,
+                "end_date": et
+            }
+        }
+
+        for k in parameters:
+            if type(parameters[k]) is not str:
+                parameters[k] = json.dumps(parameters[k])
+        r = requests.get(url, params=parameters).json()
+        # print(r)
+
+        if r.get("data"):
+            for i in r['data']['list']:
+                if i['cost']>0:
+                    li.append(
+                        (
+                            i['date'], i['ad_id'], i['cost']/100, i['view_count'], i['ctr']*i['view_count'],
+                            i['follow_count']
+                        )
+                    )
+
+        total_page = r['data']['page_info']['total_page']
+        if page >= total_page:
+            break
+        else:
+            page += 1
+    # print(li)
+    if len(li) > 0:
+        print(f"{account_id} have ad cost :{len(li)} ")
+        db.quchen_text.executeMany('replace into ad_cost_day values(%s,%s,%s,%s,%s,%s)', li)
+        db.close()
+
+
+def ad_cost_day_mp(account_id,access_token,st,et):
+    url = 'https://api.e.qq.com/v1.3/daily_reports/get'
+    fields = ('date', 'ad_id', 'cost', 'view_count', 'valid_click_count', 'official_account_follow_count')
+    li = []
+    page = 1
+    while True:
+        parameters = {
+            'access_token': access_token,
+            'timestamp': int(time.time()),
+            'nonce': str(time.time()) + str(random.randint(0, 999999)),
+            'fields': fields,
+            "account_id": account_id,
+            "level": 'REPORT_LEVEL_AD_WECHAT',
+            "page": page,
+            "page_size": 1000,
+            "date_range": {
+                "start_date": st,
+                "end_date": et
+            }
+        }
+
+        for k in parameters:
+            if type(parameters[k]) is not str:
+                parameters[k] = json.dumps(parameters[k])
+
+        r = requests.get(url, params=parameters).json()
+
+
+        if r.get("data"):
+            for i in r['data']['list']:
+                if i['cost']>0:
+                    li.append(
+                        (
+                            i['date'],i['ad_id'],i['cost']/100,i['view_count'],i['valid_click_count'],
+                            i['official_account_follow_count']
+                        )
+                    )
+
+        total_page = r['data']['page_info']['total_page']
+        if page >=total_page:
+            break
+        else:
+            page += 1
+    # print(li)
+    if len(li) > 0:
+        print(f"{account_id} have ad cost :{len(li)} ")
+        db.quchen_text.executeMany('replace into ad_cost_day values(%s,%s,%s,%s,%s,%s)', li)
+        db.close()
+
+def daily_reports_get(access_token, account_id, level, start_date, end_date, fields):  # 获取wx投放计划日报数据
+
+    interface = 'daily_reports/get'
+    url = 'https://api.e.qq.com/v1.3/' + interface
+
+    common_parameters = {
+        'access_token': access_token,
+        'timestamp': int(time.time()),
+        'nonce': str(time.time()) + str(random.randint(0, 999999)),
+        'fields': fields
+    }
+
+    parameters = {
+        "account_id": account_id,
+        "level": level,
+        "date_range":
+            {
+                "start_date": start_date,
+                "end_date": end_date
+            },
+
+        "page": 1,
+        "page_size": 1000,
+        "fields":
+            [
+
+            ]
+    }
+
+    parameters.update(common_parameters)
+    for k in parameters:
+        if type(parameters[k]) is not str:
+            parameters[k] = json.dumps(parameters[k])
+
+    while True:
+        r = requests.get(url, params=parameters)
+        if r.status_code == 200:
+            break
+        else:
+            time.sleep(1)
+            print("请求出错 等待1s..")
+
+    return r.json()
+
+
+def daily_qq_reports_get(access_token, account_id, compaign_id, level, start_date, end_date, fields):  # 获取gdt投放计划日报数据
+
+    interface = 'daily_reports/get'
+    url = 'https://api.e.qq.com/v1.1/' + interface
+
+    common_parameters = {
+        'access_token': access_token,
+        'timestamp': int(time.time()),
+        'nonce': str(time.time()) + str(random.randint(0, 999999)),
+        'fields': fields
+    }
+
+    parameters = {
+        "account_id": account_id,
+        "filtering":
+            [
+
+                {
+                    "field": "campaign_id",
+                    "operator": "EQUALS",
+                    "values":
+                        [
+                            compaign_id
+                        ]
+                }
+            ],
+        "level": level,
+        "date_range":
+            {
+                "start_date": start_date,
+                "end_date": end_date
+            },
+
+        "page": 1,
+        "page_size": 1000,
+        "fields":
+            [
+
+            ]
+    }
+
+    parameters.update(common_parameters)
+    for k in parameters:
+        if type(parameters[k]) is not str:
+            parameters[k] = json.dumps(parameters[k])
+
+    r = requests.get(url, params=parameters)
+
+    return r.json()
+
+
+
+def mysql_insert_adcreative(data):
+    db = pymysql.connect('rm-bp1c9cj79872tx3aaro.mysql.rds.aliyuncs.com', 'superc', 'Cc719199895', 'quchen_text')
+    cursor = db.cursor()
+
+    sql = 'replace into adcreative (campaign_id,adcreative_id,adcreative_name,image_id,title,promoted_object_type,page_type,page_id,link_page_id,promoted_object_id) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'
+    try:
+        cursor.executemany(sql, data)
+        db.commit()
+
+        print('insert [adcreative] ', len(data))
+    except:
+        db.rollback()
+        print('insert [adcreative] defeat')
+
+
+
+
+
+if __name__ == '__main__':
+    account_id = 19016239
+    access_token = '65407243a6072b7dee3a013b58225e16'
+    #
+    account_id2 = 14709511
+    access_token2 = 'e87f7b6f860eaeef086ddcc9c3614678'
+    # get_ad_cost_day(account_id2,access_token2,'GDT','2021-03-01','2021-03-24')
+
+
+
+
+
+
+

+ 73 - 0
app/api_data/get_cost.py

@@ -0,0 +1,73 @@
+
+from app.api_data.cost_util import *
+from model.DateUtils import DateUtils
+from model.DataBaseUtils import MysqlUtils
+from concurrent.futures import ThreadPoolExecutor
+db = MysqlUtils()
+du = DateUtils()
+executor = ThreadPoolExecutor(max_workers=50)
+
+
+def get_accounts(filter=None):
+	if filter:
+		if filter=='MP':
+			return db.quchen_text.getData("select account_id,access_token,name channel from advertiser_vx where (name !='' or name is not null)")
+		else:
+			return db.quchen_text.getData("select account_id,access_token,name channel from advertiser_qq where (name !='' or name is not null)")
+
+	return db.quchen_text.getData("select account_id,access_token,name channel,'MP' flag from advertiser_vx where (name !='' or name is not null) union "
+								   "select account_id,access_token,name channel,'GDT' flag from advertiser_qq where (name !='' or name is not null)")
+
+"""广告"""
+def ad():
+
+	for account in get_accounts():
+		executor.submit(get_ad_info, account[0], account[1],account[3])
+	executor.shutdown()
+
+	
+"""广告日消耗"""
+def ad_cost_day():
+	st = '2021-01-01'
+	et = '2021-03-26'
+	print(st,et)
+
+
+	for account in get_accounts():
+		# print(account)
+		executor.submit(get_ad_cost_day, account[0], account[1],account[3],st,et)
+	executor.shutdown()
+
+
+
+
+"""图片"""
+def image():
+	# 默认拉取前10天创建的图片
+	for account in get_accounts():
+		executor.submit(images_info_get, account[0], account[1])
+	executor.shutdown()
+
+"""创意"""
+def adcreative():
+	for account in get_accounts():
+		executor.submit(get_adcreatives, account[0], account[1],account[3])
+	executor.shutdown()
+
+
+def run():
+	image()
+	ad()
+	adcreative()
+	ad_cost_day()
+
+
+if __name__ == '__main__':
+	# MP
+	# account_id = 18516323
+	# access_token = '262deda76aec00c2e144e83bd3c0b2a2'
+	#
+	# account_id2= 14709511
+	# access_token2 = 'e87f7b6f860eaeef086ddcc9c3614678'
+	run()
+

+ 0 - 0
app/etl/dw/dw_ad_day.py


+ 94 - 0
app/etl/dw/image_day.py

@@ -0,0 +1,94 @@
+import logging
+from model.DataBaseUtils import MysqlUtils,CkUtils
+from model.DateUtils import DateUtils
+logging.getLogger().setLevel(logging.WARNING)
+import pandas as pd
+db = MysqlUtils()
+ck = CkUtils()
+du=DateUtils()
+def run(dt):
+    # 单图片的消耗
+    sql=f"""replace into dm_image_cost_day 
+            select a.dt,type,book,signature,sum(cost),sum(view_count),sum(click_count),sum(follow_count),min(preview_url) from 
+            (SELECT a.dt,a.ad_id,a.cost,c.image_id,preview_url,signature,view_count,click_count,follow_count,ad_name,b.type,title,description,e.channel,stage,pitcher,platform,book
+            from 
+            ad_cost_day a 
+            left join ad_info b on a.ad_id=b.ad_id
+            left join adcreative_info c on b.adcreative_id=c.adcreative_id
+            left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
+            left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
+            left join image_info g on c.image_id=g.image_id
+            where a.dt='{dt}' and INSTR(c.image_id,',')=0 and preview_url is not null  ) a
+            GROUP BY signature,book,type,a.dt 
+            
+                        """
+    db.quchen_text.execute(sql)
+
+    # 多图的
+
+def run2(dt):
+    sql2 = f"""
+            SELECT dt,image_id,sum(cost),sum(view_count),sum(click_count),sum(follow_count),type,book
+            FROM
+            
+            (SELECT a.dt,a.ad_id,a.cost,c.image_id,view_count,click_count,follow_count,ad_name,b.type,title,description,e.channel,stage,pitcher,platform,book
+            from 
+            ad_cost_day a 
+            left join ad_info b on a.ad_id=b.ad_id
+            left join adcreative_info c on b.adcreative_id=c.adcreative_id
+            left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
+            left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
+            
+            where a.dt='{dt}' and INSTR(c.image_id,',')>0 ) a
+            group by image_id,dt,type,book
+"""
+
+    data =db.quchen_text.get_data_list(sql2)
+    # print(data)
+    li=[]
+    for i in data:
+        li.extend(i[1].split(','))
+    # print(set(li))
+
+    sql3 =f"select image_id,preview_url,signature from image_info where  image_id in ({str(set(li))[1:-1]})"
+
+    image_di={}
+    di2 = {}
+    image_data =db.quchen_text.getData(sql3)
+    for x in image_data:
+        image_di[x[0]]=x[1]
+        di2[x[0]]=x[2]
+    # print(image_di)
+
+
+    for i in data:
+        y = ''
+        p =''
+        for j in i[1].split(','):
+            if image_di.get(j):
+                y = y+','+image_di.get(j)
+                p = p+','+di2.get(j)
+        i.append(y[1:])
+        i.append(p[1:])
+
+
+    df = pd.DataFrame(data)
+    # print(df)
+
+    df2 =df.groupby([0,6,7,9],as_index=False).agg({2:'sum',3:'sum',4:'sum',5:'sum',8:'min'})
+    # print(df2)
+    # print()
+
+    db.quchen_text.executeMany('replace into dm_image_cost_day values(%s,%s,%s,%s,%s,%s,%s,%s,%s)',df2.values.tolist())
+
+
+
+def day():
+    for i in du.getDateLists('2020-04-01','2021-01-01'):
+        run(i)
+        run2(i)
+
+
+if __name__ == '__main__':
+
+    day()

+ 4 - 0
app/etl/dw/test.py

@@ -0,0 +1,4 @@
+import pandas as pd
+df =pd.DataFrame([[1,2,4],[4,5,6],[1,3,4]],columns=['a','b','c'])
+print(df)
+print(df.groupby('a',as_index=False).sum())

+ 1 - 1
model/DataBaseOperation.py

@@ -89,7 +89,7 @@ class MysqlOperation:
         # if MYSQL_DEBUG:
         #     log.info('sql: \n' + sql)
         #     log.info('sql cost: %s' % (time.time() - start))
-        print(f"affect rows :{k}")
+        print(f"\033[1;36maffect rows :{k} \033[0m")
 
     def getOne(self,sql, args=None):
         result = self.getData(sql, args)