ck 4 vuotta sitten
vanhempi
commit
d4930af6af

+ 58 - 35
app/api_data/cost_util.py

@@ -17,12 +17,14 @@ max_workers = 10
 count = []
 t = du.get_n_days(-10)
 
-def get_adcreatives(account_id,access_token,flag):  # 获取创意
+def get_adcreatives(account_id,access_token,flag,adc_ids,dt):  # 获取创意
+
     url = 'https://api.e.qq.com/v1.1/adcreatives/get'
     li =[]
     page = 1
 
     while True:
+
         parameters = {
             'access_token': access_token,
             'timestamp': int(time.time()),
@@ -30,12 +32,12 @@ def get_adcreatives(account_id,access_token,flag):  # 获取创意
             'fields': ('campaign_id', 'adcreative_id', 'adcreative_name', 'adcreative_elements', 'promoted_object_type', 'page_type',
             'page_spec', 'link_page_spec', 'universal_link_url', 'promoted_object_id','site_set'),
             "filtering": [{
-                "field": "created_time",
-                "operator": "GREATER_EQUALS",
-                "values":
-                    [
-                        DateUtils.str_to_stamp(t)
-                    ]}],
+                "field": "adcreative_id",
+                "operator": "IN",
+                "values": adc_ids.split(',')
+
+
+                    }],
             "account_id": account_id,
             "page": page,
             "page_size": 100,
@@ -46,8 +48,10 @@ def get_adcreatives(account_id,access_token,flag):  # 获取创意
             if type(parameters[k]) is not str:
                 parameters[k] = json.dumps(parameters[k])
 
+
         while True:
             h = requests.get(url, params=parameters)
+
             if h.status_code == 200:
                 r = h.json()
                 # print(r)
@@ -82,7 +86,7 @@ def get_adcreatives(account_id,access_token,flag):  # 获取创意
                         i['adcreative_id'],i['adcreative_name'],i['campaign_id'],image,title,
                         i.get('promoted_object_type',''),i.get('page_type',''),
                         i['page_spec'].get('page_id',''),i.get('promoted_object_id',''),
-                        '','','MP'
+                        '','','MP',account_id,dt
                     ))
                 else:
                     if len(i['adcreative_elements'])>0:
@@ -106,7 +110,7 @@ def get_adcreatives(account_id,access_token,flag):  # 获取创意
                             i['adcreative_id'], i['adcreative_name'], i['campaign_id'],image,title,
                             i.get('promoted_object_type', ''), i.get('page_type', ''),
                             i['page_spec'].get('page_id', ''), i.get('promoted_object_id', ''),
-                            ','.join(i['site_set']),description,'GDT'
+                            ','.join(i['site_set']),description,'GDT',account_id,dt
 
                         )
                     )
@@ -124,12 +128,12 @@ def get_adcreatives(account_id,access_token,flag):  # 获取创意
             break
     if len(li)>0:
         print(f"{account_id}有创意:",len(li))
-        sql='replace into adcreative_info values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) '
+        sql='replace into adcreative_info values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) '
         db.quchen_text.executeMany(sql,li)
 
 
 
-def images_info_get(account_id,access_token):  # 获取图片信息
+def images_info_get(account_id,access_token,image_ids):  # 获取图片信息
 
     fields = ('image_id','width','height','file_size','signature','preview_url')
     interface = 'images/get'
@@ -152,12 +156,11 @@ def images_info_get(account_id,access_token):  # 获取图片信息
         parameters = {
             "account_id": account_id,
             "filtering":[{
-                    "field": "created_time",
-                    "operator": "GREATER_EQUALS",
-                    "values":
-                    [
-                        DateUtils.str_to_stamp(t)
-                    ]}],
+                    "field": "image_id",
+                    "operator": "IN",
+                    "values": image_ids.split(',')
+
+                    }],
             "page": page,
             "page_size": 100
         }
@@ -225,7 +228,8 @@ def ad_info():
 """获取广告基础信息"""
 
 
-def get_ad_info(account_id, access_token, flag):
+def get_ad_info(account_id, access_token, flag,ad_ids,dt):
+
 
     path = 'ads/get'
     fields = ('ad_id', 'ad_name', 'adcreative_id', 'adgroup_id', 'campaign_id')
@@ -240,12 +244,11 @@ def get_ad_info(account_id, access_token, flag):
             'nonce': str(time.time()) + str(random.randint(0, 999999)),
             'fields': fields,
             "filtering": [{
-                "field": "created_time",
-                "operator": "GREATER_EQUALS",
+                "field": "ad_id",
+                "operator": "IN",
                 "values":
-                    [
-                        DateUtils.str_to_stamp(t)
-                    ]}],
+                    ad_ids.split(',')
+            }],
             "account_id": account_id,
             "page": page,
             "page_size": 100,
@@ -255,8 +258,14 @@ def get_ad_info(account_id, access_token, flag):
         for k in parameters:
             if type(parameters[k]) is not str:
                 parameters[k] = json.dumps(parameters[k])
-        r = requests.get(url, params=parameters).json()
 
+        while True:
+            r = requests.get(url, params=parameters).json()
+            code = r['code']
+            if code == 11017:
+                time.sleep(61)
+            else:
+                break
         # print(r)
         total_page = r['data']['page_info']['total_page']
 
@@ -268,12 +277,12 @@ def get_ad_info(account_id, access_token, flag):
         if r.get("data"):
             for i in r['data']['list']:
                 li.append((str(i['ad_id']), i['ad_name'], i['adcreative_id'], i['campaign_id'], i['adgroup_id'],
-                           account_id, flag))
+                           account_id, flag, dt))
 
 
     if li.__len__()>0:
         print(f"{account_id}有广告:",li.__len__())
-        sql = "replace into ad_info values(%s,%s,%s,%s,%s,%s,%s) "
+        sql = "replace into ad_info values(%s,%s,%s,%s,%s,%s,%s,%s) "
         db.quchen_text.executeMany(sql, li)
     db.close()
 
@@ -288,7 +297,7 @@ def get_ad_cost_day(account_id,access_token,flag,st,et):
 
 def ad_cost_day_gdt(account_id,access_token,st,et):
     url = 'https://api.e.qq.com/v1.3/daily_reports/get'
-    fields = ('date', 'ad_id', 'cost', 'view_count', 'ctr', 'follow_count')
+    fields = ('date', 'ad_id', 'cost', 'view_count', 'ctr', 'follow_count','web_order_count','order_amount')
     li = []
     page = 1
     while True:
@@ -311,8 +320,15 @@ def ad_cost_day_gdt(account_id,access_token,st,et):
         for k in parameters:
             if type(parameters[k]) is not str:
                 parameters[k] = json.dumps(parameters[k])
-        r = requests.get(url, params=parameters).json()
+
+        while True:
+            r  = requests.get(url, params=parameters).json()
         # print(r)
+            code =r['code']
+            if code==11017:
+                time.sleep(61)
+            else:
+                break
 
         if r.get("data"):
             for i in r['data']['list']:
@@ -320,7 +336,7 @@ def ad_cost_day_gdt(account_id,access_token,st,et):
                     li.append(
                         (
                             i['date'], i['ad_id'], i['cost']/100, i['view_count'], i['ctr']*i['view_count'],
-                            i['follow_count']
+                            i['follow_count'],i['web_order_count'],i['order_amount'],account_id,'GDT'
                         )
                     )
 
@@ -332,13 +348,13 @@ def ad_cost_day_gdt(account_id,access_token,st,et):
     # print(li)
     if len(li) > 0:
         print(f"{account_id} have ad cost :{len(li)} ")
-        db.quchen_text.executeMany('replace into ad_cost_day values(%s,%s,%s,%s,%s,%s)', li)
+        db.quchen_text.executeMany('replace into ad_cost_day values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)', li)
         db.close()
 
 
 def ad_cost_day_mp(account_id,access_token,st,et):
     url = 'https://api.e.qq.com/v1.3/daily_reports/get'
-    fields = ('date', 'ad_id', 'cost', 'view_count', 'valid_click_count', 'official_account_follow_count')
+    fields = ('date', 'ad_id', 'cost', 'view_count', 'valid_click_count', 'official_account_follow_count','order_count','order_amount')
     li = []
     page = 1
     while True:
@@ -361,8 +377,14 @@ def ad_cost_day_mp(account_id,access_token,st,et):
             if type(parameters[k]) is not str:
                 parameters[k] = json.dumps(parameters[k])
 
-        r = requests.get(url, params=parameters).json()
-
+        while True:
+            r = requests.get(url, params=parameters).json()
+            # print(r)
+            code = r['code']
+            if code == 11017:
+                time.sleep(61)
+            else:
+                break
 
         if r.get("data"):
             for i in r['data']['list']:
@@ -370,7 +392,8 @@ def ad_cost_day_mp(account_id,access_token,st,et):
                     li.append(
                         (
                             i['date'],i['ad_id'],i['cost']/100,i['view_count'],i['valid_click_count'],
-                            i['official_account_follow_count']
+                            i['official_account_follow_count'],i['order_count'],i['order_amount'],account_id,'MP'
+
                         )
                     )
 
@@ -382,7 +405,7 @@ def ad_cost_day_mp(account_id,access_token,st,et):
     # print(li)
     if len(li) > 0:
         print(f"{account_id} have ad cost :{len(li)} ")
-        db.quchen_text.executeMany('replace into ad_cost_day values(%s,%s,%s,%s,%s,%s)', li)
+        db.quchen_text.executeMany('replace into ad_cost_day values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)', li)
         db.close()
 
 def daily_reports_get(access_token, account_id, level, start_date, end_date, fields):  # 获取wx投放计划日报数据

+ 70 - 27
app/api_data/get_cost.py

@@ -5,8 +5,8 @@ from model.DataBaseUtils import MysqlUtils
 from concurrent.futures import ThreadPoolExecutor
 db = MysqlUtils()
 du = DateUtils()
-executor = ThreadPoolExecutor(max_workers=50)
 
+max_workers =10
 
 def get_accounts(filter=None):
 	if filter:
@@ -18,48 +18,87 @@ def get_accounts(filter=None):
 	return db.quchen_text.getData("select account_id,access_token,name channel,'MP' flag from advertiser_vx where (name !='' or name is not null) union "
 								   "select account_id,access_token,name channel,'GDT' flag from advertiser_qq where (name !='' or name is not null)")
 
-"""广告"""
-def ad():
 
-	for account in get_accounts():
-		executor.submit(get_ad_info, account[0], account[1],account[3])
+
+def ad(dt):
+	sql =f"""SELECT b.account_id,b.access_token,b.type,GROUP_CONCAT(ad_id)  from ad_cost_day  a
+left join (
+select account_id,access_token,'MP' type  from advertiser_vx where (name !='' or name is not null) union 
+select account_id,access_token,'GDT' type  from advertiser_qq where (name !='' or name is not null)
+) b on a.account_id=b.account_id
+where a.dt='{dt}'
+GROUP BY b.account_id,b.access_token,b.type"""
+
+	accounts = db.quchen_text.getData(sql)
+	executor = ThreadPoolExecutor(max_workers=max_workers)
+
+	for account in accounts:
+		executor.submit(get_ad_info, account[0], account[1], account[2],account[3],dt)
 	executor.shutdown()
 
-	
-"""广告日消耗"""
-def ad_cost_day():
-	st = '2021-01-01'
-	et = '2021-03-26'
-	print(st,et)
 
 
+
+
+"""广告日消耗"""
+def ad_cost_day(dt):
+
+	st = et = dt
+	executor = ThreadPoolExecutor(max_workers=max_workers)
 	for account in get_accounts():
-		# print(account)
 		executor.submit(get_ad_cost_day, account[0], account[1],account[3],st,et)
 	executor.shutdown()
 
 
 
 
-"""图片"""
-def image():
-	# 默认拉取前10天创建的图片
-	for account in get_accounts():
-		executor.submit(images_info_get, account[0], account[1])
+
+
+def adcreative(dt):
+
+	sql = f"""SELECT b.account_id,b.access_token,b.type,GROUP_CONCAT(adcreative_id)  from ad_info  a
+left join (
+select account_id,access_token,'MP' type  from advertiser_vx where (name !='' or name is not null) union 
+select account_id,access_token,'GDT' type from advertiser_qq where (name !='' or name is not null)
+) b on a.account_id=b.account_id
+where a.dt='{dt}'
+GROUP BY b.account_id,b.access_token,b.type"""
+
+	accounts = db.quchen_text.getData(sql)
+	executor = ThreadPoolExecutor(max_workers=max_workers)
+	for account in accounts:
+		executor.submit(get_adcreatives, account[0], account[1],account[2],account[3],dt)
 	executor.shutdown()
 
-"""创意"""
-def adcreative():
-	for account in get_accounts():
-		executor.submit(get_adcreatives, account[0], account[1],account[3])
+def image(dt):
+	sql=f"""SELECT b.account_id,b.access_token,b.type,GROUP_CONCAT(image_id)  from adcreative_info  a
+left join (
+select account_id,access_token,'MP' type  from advertiser_vx where (name !='' or name is not null) union 
+select account_id,access_token,'GDT' type from advertiser_qq where (name !='' or name is not null)
+) b on a.account_id=b.account_id
+where a.dt='{dt}'
+GROUP BY b.account_id,b.access_token,b.type"""
+	accounts = db.quchen_text.getData(sql)
+	executor = ThreadPoolExecutor(max_workers=max_workers)
+	for account in accounts:
+		executor.submit(images_info_get, account[0], account[1], account[3])
 	executor.shutdown()
 
 
-def run():
-	image()
-	ad()
-	adcreative()
-	ad_cost_day()
+
+def day():
+	"""
+	1.拉取有消耗的广告
+	2.用有消耗的广告id 去拉取广告基础信息
+	3.用第2步获取的创意id 去拉取广告创意基础信息
+	4.用创意信息中的图片id 去获取图片的基础信息
+	"""
+
+	dt = du.getNow()
+	ad_cost_day(dt)
+	ad(dt)
+	adcreative(dt)
+	image(dt)
 
 
 if __name__ == '__main__':
@@ -69,5 +108,9 @@ if __name__ == '__main__':
 	#
 	# account_id2= 14709511
 	# access_token2 = 'e87f7b6f860eaeef086ddcc9c3614678'
-	run()
+	# run()
+	# ad_cost_day()
+	#
+
+	day()
 

+ 15 - 0
app/etl/ad_cost_day_run.py

@@ -0,0 +1,15 @@
+from app.api_data import get_cost
+from app.etl.dw import dw_ad_day
+from model.DingTalkUtils import DingTalkUtils
+
+if __name__ == '__main__':
+    try:
+        get_cost.day() # 广告相关消耗数据
+    except:
+        DingTalkUtils.send('广告消耗数据拉取出错')
+        exit(0)
+
+    try:
+        dw_ad_day.day()
+    except:
+        DingTalkUtils.send("广告数据清洗出错")

+ 63 - 0
app/etl/dw/dw_ad_day.py

@@ -0,0 +1,63 @@
+import logging
+from model.DataBaseUtils import MysqlUtils,CkUtils
+from model.DateUtils import DateUtils
+logging.getLogger().setLevel(logging.WARNING)
+import pandas as pd
+db = MysqlUtils()
+ck = CkUtils()
+du=DateUtils()
+
+
+def run(dt):
+    sql=f"""SELECT a.dt,a.ad_id,b.ad_name,b.type,'',a.cost,view_count,click_count,follow_count,order_count,order_amount,
+            title,description,e.channel,pitcher,book,platform,stage,ifnull(image_id,'')
+            from 
+            ad_cost_day a 
+            left join ad_info b on a.ad_id=b.ad_id
+            left join adcreative_info c on b.adcreative_id=c.adcreative_id
+            left join channel_by_account_daily e on b.account_id=e.account_id and a.dt=e.dt
+            left join channel_info_daily f on e.channel=f.channel and e.dt=f.dt
+            
+            where a.dt='{dt}'  """
+
+
+    data = db.quchen_text.get_data_list(sql)
+
+    # print(data)
+
+    # 图片链接拼接
+
+    li = []
+    for i in data:
+        # print(i)
+        li.extend(i[-1].split(','))
+    # print(li)
+
+    sql3 = f"select image_id,preview_url from image_info where  image_id in ({str(set(li))[1:-1]})"
+
+    image_di = {}
+    image_data = db.quchen_text.getData(sql3)
+    for x in image_data:
+        image_di[x[0]] = x[1]
+
+    # print(image_di)
+
+    for i in data:
+        y = ''
+        for j in i[-1].split(','):
+            if image_di.get(j):
+                y = y + ',' + image_di.get(j)
+        i.append(y[1:])
+
+    # print(data)
+
+    db.dm.executeMany("replace into dw_ad_day values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",data)
+
+
+
+def day():
+    run(du.getNow())
+
+
+if __name__ == '__main__':
+    day()

+ 23 - 0
model/ComUtils.py

@@ -21,3 +21,26 @@ def get_random_str(num=5):
         salt += random.choice(H)
     return salt
 
+"""把一个列表切分多个列表"""
+def split_list(li,range=2):
+    data =[]
+    a = len(li)
+    x = 0
+    y = range
+
+    while True:
+        # print(x,y)
+        data.append(li[x:y])
+        x = y
+        y += range
+        if y >= a+range:
+            break
+    return data
+
+
+if __name__ == '__main__':
+    a=split_list([1,2,3,4,5,6,7,8,9,10],3)
+    print(a)
+
+
+

+ 20 - 13
model/DataBaseOperation.py

@@ -65,6 +65,13 @@ class MysqlOperation:
             li.append(list(i))
         return li
 
+    def getDataOneList(self,sql):
+        """获取一列"""
+        data = self.getData(sql)
+        li = []
+        for i in data:
+            li.append(i[0])
+        return li
 
 
     def execute(self, sql,data=None):
@@ -115,19 +122,19 @@ class MysqlOperation:
             log.info('sql cost: %s' % (time.time() - start))
         return df
 
-    def insertData(self, sql, args=None):
-        # if args:
-        #     log.debug(sql % tuple(args))
-        # else:
-        #     log.debug(sql)
-        start = time.time()
-        self.cursor.execute(sql, args=args)
-
-        if MYSQL_DEBUG:
-            sql_str = sql % tuple(args) if args else sql
-            log.info('sql: \n' + sql_str)
-            log.info('sql cost: %s' % (time.time() - start))
-        self.conn.commit()
+    # def insertData(self, sql, args=None):
+    #     # if args:
+    #     #     log.debug(sql % tuple(args))
+    #     # else:
+    #     #     log.debug(sql)
+    #     start = time.time()
+    #     self.cursor.execute(sql, args=args)
+    #
+    #     if MYSQL_DEBUG:
+    #         sql_str = sql % tuple(args) if args else sql
+    #         log.info('sql: \n' + sql_str)
+    #         log.info('sql cost: %s' % (time.time() - start))
+    #     self.conn.commit()
 
     def executeWithoutCommit(self, sql, args=None):
         return self.cursor.execute(sql, args=args)