import json import requests import time import pymysql import logging from concurrent.futures import ThreadPoolExecutor from model.DataBaseUtils import MysqlUtils from model.ComUtils import * from model.DateUtils import DateUtils du = DateUtils() db = MysqlUtils() max_workers = 10 count = [] t = du.get_n_days(-10) def get_campaign(account_id, access_token, flag, campaign_ids, dt): path = 'campaigns/get' fields = ('campaign_id', 'campaign_name', 'configured_status', 'campaign_type', 'promoted_object_type', 'daily_budget', 'budget_reach_date', 'created_time', 'last_modified_time', 'speed_mode', 'is_deleted') url = 'https://api.e.qq.com/v1.3/' + path li = [] page = 1 while True: parameters = { 'access_token': access_token, 'timestamp': int(time.time()), 'nonce': str(time.time()) + str(random.randint(0, 999999)), 'fields': fields, "filtering": [{ "field": "campaign_id", "operator": "IN", "values": campaign_ids.split(',') }], "account_id": account_id, "page": page, "page_size": 100, "is_deleted": False } for k in parameters: if type(parameters[k]) is not str: parameters[k] = json.dumps(parameters[k]) while True: r = requests.get(url, params=parameters).json() code = r['code'] if code == 11017: time.sleep(61) else: break # logging.info(r) total_page = r['data']['page_info']['total_page'] if page > total_page: break else: page += 1 if r.get("data"): for i in r['data']['list']: li.append((str(i['campaign_id']), i['campaign_name'], i['configured_status'], i['campaign_type'], i['promoted_object_type'], i['daily_budget'], i.get('budget_reach_date'), DateUtils.stamp_to_str(i['created_time']), DateUtils.stamp_to_str(i['last_modified_time']), i.get('speed_mode'), i.get('is_deleted'), account_id, flag, dt)) # logging.info(li) """mp 没有 speed_mode,is_deleted,budget_reach_date""" if li.__len__() > 0: logging.info(f"{account_id}有计划:" + str(li.__len__())) sql = "replace into campaign_info values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)" db.quchen_text.executeMany(sql, li) db.close() def get_adcreatives(account_id, access_token, flag, adc_ids, dt): # 获取创意 # 接口https://developers.e.qq.com/docs/api/adsmanagement/adcreatives/adcreatives_get?version=1.3 url = 'https://api.e.qq.com/v1.1/adcreatives/get' li = [] page = 1 logging.info(f"{account_id}开始获取创意") while True: parameters = { 'access_token': access_token, 'timestamp': int(time.time()), 'nonce': str(time.time()) + str(random.randint(0, 999999)), 'fields': ('campaign_id', 'adcreative_id', 'adcreative_name', 'adcreative_elements', 'promoted_object_type', 'page_type', 'page_spec', 'link_page_spec', 'universal_link_url', 'promoted_object_id', 'site_set'), "filtering": [{ "field": "adcreative_id", "operator": "IN", "values": adc_ids.split(',') }], "account_id": account_id, "page": page, "page_size": 100, "is_deleted": False } for k in parameters: if type(parameters[k]) is not str: parameters[k] = json.dumps(parameters[k]) while True: h = requests.get(url, params=parameters, timeout=1) # logging.info(h.json()) if h.status_code == 200: r = h.json() # logging.info(r) break else: time.sleep(1) logging.info("爬取失败 等待1s") logging.info(f"{account_id}采集到创意") if 'data' in r.keys(): is_video = 0 for i in r['data']['list']: # logging.info(i) if flag == 'MP': if len(i['adcreative_elements']) > 0: d = i['adcreative_elements'] title = d.get('title', '') description = d.get('description', '') if 'image' in d.keys(): image = d.get('image', '') elif 'image_list' in d.keys(): image = ','.join(d.get('image_list')) elif 'video' in d.keys(): image = d['video'] is_video = 1 else: image = '' else: title = image = '' li.append(( i['adcreative_id'], i['adcreative_name'], i['campaign_id'], image, title, i.get('promoted_object_type', ''), i.get('page_type', ''), i['page_spec'].get('page_id', ''), i.get('promoted_object_id', ''), '', description, 'MP', account_id, dt, is_video )) else: if len(i['adcreative_elements']) > 0: d = i['adcreative_elements'] if 'image' in d.keys(): image = d['image'] elif 'element_story' in d.keys(): image = ','.join([x['image'] for x in d['element_story']]) else: image = '' title = d.get('title', '') description = d.get('description', '') else: image = title = description = '' li.append( ( i['adcreative_id'], i['adcreative_name'], i['campaign_id'], image, title, i.get('promoted_object_type', ''), i.get('page_type', ''), i['page_spec'].get('page_id', ''), i.get('promoted_object_id', ''), ','.join(i['site_set']), description, 'GDT', account_id, dt, is_video ) ) total_page = r['data']['page_info']['total_page'] if total_page > page: page += 1 else: break else: break logging.info(f"{account_id}创意分析结束") logging.info(f"{account_id}获取创意,结束") if len(li) > 0: logging.info(f"{account_id}有创意:" + str(len(li))) sql = 'replace into adcreative_info values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) ' db.quchen_text.executeMany(sql, li) def images_info_get(account_id, access_token, image_ids): # 获取图片信息 # 接口https://developers.e.qq.com/docs/api/business_assets/image/images_get?version=1.3 fields = ('image_id', 'width', 'height', 'file_size', 'signature', 'preview_url') interface = 'images/get' url = 'https://api.e.qq.com/v1.3/' + interface page = 1 li = [] while True: common_parameters = { 'access_token': access_token, 'timestamp': int(time.time()), 'nonce': str(time.time()) + str(random.randint(0, 999999)), 'fields': fields } parameters = { "account_id": account_id, "filtering": [{ "field": "image_id", "operator": "IN", "values": image_ids.split(',') }], "page": page, "page_size": 100 } parameters.update(common_parameters) for k in parameters: if type(parameters[k]) is not str: parameters[k] = json.dumps(parameters[k]) while True: h = requests.get(url, params=parameters) # logging.info(h.text) if h.status_code == 200: r = h.json() break else: time.sleep(1) logging.info("请求出错 等待1s..") if 'data' in r.keys(): li.extend(r['data']['list']) total_page = r['data']['page_info']['total_page'] if total_page > page: page += 1 else: break # logging.info(li) data = [] for i in li: data.append((i['image_id'], i['width'], i['height'], i['signature'], i['preview_url'])) # logging.info(data) logging.info(f"{account_id} 有图片:" + str(li.__len__())) if li.__len__() > 0: sql = "replace into image_info value (%s,%s,%s,%s,%s)" db.quchen_text.executeMany(sql, data) db.close() def video_info_get(account_id, access_token, image_ids): # 获取视频信息 # 接口https://developers.e.qq.com/docs/api/business_assets/video/videos_get?version=1.3 fields = ('video_id', 'width', 'height', 'file_size', 'signature', 'preview_url') interface = 'videos/get' url = 'https://api.e.qq.com/v1.3/' + interface page = 1 li = [] while True: common_parameters = { 'access_token': access_token, 'timestamp': int(time.time()), 'nonce': str(time.time()) + str(random.randint(0, 999999)), 'fields': fields } parameters = { "account_id": account_id, "filtering": [{ "field": "media_id", "operator": "IN", "values": image_ids.split(',') }], "page": page, "page_size": 100 } parameters.update(common_parameters) for k in parameters: if type(parameters[k]) is not str: parameters[k] = json.dumps(parameters[k]) while True: h = requests.get(url, params=parameters) # logging.info(h.text) if h.status_code == 200: r = h.json() break else: time.sleep(1) logging.info("请求出错 等待1s..") if 'data' in r.keys(): li.extend(r['data']['list']) total_page = r['data']['page_info']['total_page'] if total_page > page: page += 1 else: break # logging.info(li) data = [] for i in li: data.append((i['video_id'], i['width'], i['height'], i['signature'], i['preview_url'])) # logging.info(data) logging.info(f"{account_id} 有视频:" + str(li.__len__())) if li.__len__() > 0: sql = "replace into video_info value (%s,%s,%s,%s,%s)" db.quchen_text.executeMany(sql, data) db.close() def ad_info(): accounts = db.quchen_text.getData(""" select account_id,access_token,name channel,'GDT' type from advertiser_qq where name !='' or name is not null union select account_id,access_token,name channel,'MP' type from advertiser_vx where name !='' or name is not null """) total_data = [] executor = ThreadPoolExecutor(max_workers=max_workers) for i in accounts: # logging.info(i) account_id = i[0] access_token = i[1] type = i[3] executor.submit(get_ad_info, account_id, access_token, type, total_data) executor.shutdown() logging.info(len(total_data)) if len(total_data) > 0: sql = "replace into ad_info values(%s,%s,%s,%s,%s,%s,%s) " db.quchen_text.executeMany(sql, total_data) """获取广告基础信息""" def get_ad_info(account_id, access_token, flag, ad_ids, dt): # 接口为https://developers.e.qq.com/docs/apilist/ads/ad?version=1.3#a3 path = 'ads/get' fields = ('ad_id', 'ad_name', 'adcreative_id', 'adgroup_id', 'campaign_id') url = 'https://api.e.qq.com/v1.3/' + path li = [] page = 1 while True: parameters = { 'access_token': access_token, 'timestamp': int(time.time()), 'nonce': str(time.time()) + str(random.randint(0, 999999)), 'fields': fields, "filtering": [{ "field": "ad_id", "operator": "IN", "values": ad_ids.split(',') }], "account_id": account_id, "page": page, "page_size": 100, "is_deleted": False } for k in parameters: if type(parameters[k]) is not str: parameters[k] = json.dumps(parameters[k]) while True: r = requests.get(url, params=parameters).json() code = r['code'] if code == 11017: time.sleep(61) else: break # logging.info(r) total_page = r['data']['page_info']['total_page'] if page > total_page: break else: page += 1 if r.get("data"): for i in r['data']['list']: li.append((str(i['ad_id']), i['ad_name'], i['adcreative_id'], i['campaign_id'], i['adgroup_id'], account_id, flag, dt)) if li.__len__() > 0: logging.info(f"{account_id}有广告:" + str(li.__len__())) sql = "replace into ad_info values(%s,%s,%s,%s,%s,%s,%s,%s) " db.quchen_text.executeMany(sql, li) db.close() def get_ad_cost_day(account_id, access_token, flag, st, et): if flag == 'MP': ad_cost_day_mp(account_id, access_token, st, et) else: ad_cost_day_gdt(account_id, access_token, st, et) def ad_cost_day_gdt(account_id, access_token, st, et): # 接口文档https://developers.e.qq.com/docs/api/insights/ad_insights/daily_reports_get?version=1.3 url = 'https://api.e.qq.com/v1.3/daily_reports/get' fields = ( 'date', 'ad_id', 'adgroup_id', 'cost', 'view_count', 'ctr', 'follow_count', 'web_order_count', 'order_amount') li = [] page = 1 while True: parameters = { 'access_token': access_token, 'timestamp': int(time.time()), 'nonce': str(time.time()) + str(random.randint(0, 999999)), 'fields': fields, "account_id": account_id, "group_by": ['ad_id', 'date'], "level": 'REPORT_LEVEL_AD', "page": page, "page_size": 1000, "date_range": { "start_date": st, "end_date": et } } for k in parameters: if type(parameters[k]) is not str: parameters[k] = json.dumps(parameters[k]) while True: r = requests.get(url, params=parameters).json() # logging.info(r) code = r['code'] if code == 11017: time.sleep(61) else: break if r.get("data"): for i in r['data']['list']: if i['cost'] > 0: li.append( ( i['date'], i['ad_id'], i['adgroup_id'], i['cost'] / 100, i['view_count'], i['ctr'] * i['view_count'], i['follow_count'], i['web_order_count'], i['order_amount'] / 100, account_id, 'GDT' ) ) total_page = r['data']['page_info']['total_page'] if page >= total_page: break else: page += 1 # logging.info(li) if len(li) > 0: logging.info(f"{account_id} have ad cost :{len(li)} ") db.quchen_text.executeMany('replace into ad_cost_day values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)', li) db.close() def ad_cost_day_mp(account_id, access_token, st, et): url = 'https://api.e.qq.com/v1.3/daily_reports/get' fields = ('date', 'ad_id', 'adgroup_id', 'cost', 'view_count', 'valid_click_count', 'official_account_follow_count', 'order_count', 'order_amount') li = [] page = 1 while True: parameters = { 'access_token': access_token, 'timestamp': int(time.time()), 'nonce': str(time.time()) + str(random.randint(0, 999999)), 'fields': fields, "account_id": account_id, "level": 'REPORT_LEVEL_AD_WECHAT', "page": page, "page_size": 1000, "date_range": { "start_date": st, "end_date": et } } for k in parameters: if type(parameters[k]) is not str: parameters[k] = json.dumps(parameters[k]) while True: r = requests.get(url, params=parameters).json() # logging.info(r['data']['list']) # import pandas as pd # logging.info(pd.DataFrame(r['data']['list'])) code = r['code'] if code == 11017: time.sleep(61) else: break if r.get("data"): for i in r['data']['list']: if i['cost'] > 0: li.append( ( i['date'], i['ad_id'], i['adgroup_id'], i['cost'] / 100, i['view_count'], i['valid_click_count'], i['official_account_follow_count'], i['order_count'], i['order_amount'] / 100, account_id, 'MP' ) ) total_page = r['data']['page_info']['total_page'] if page >= total_page: break else: page += 1 # logging.info(li) # exit() if len(li) > 0: logging.info(f"{account_id} have ad cost :{len(li)} ") db.quchen_text.executeMany('replace into ad_cost_day values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)', li) db.close() def daily_reports_get(access_token, account_id, level, start_date, end_date, fields): # 获取wx投放计划日报数据 interface = 'daily_reports/get' url = 'https://api.e.qq.com/v1.3/' + interface common_parameters = { 'access_token': access_token, 'timestamp': int(time.time()), 'nonce': str(time.time()) + str(random.randint(0, 999999)), 'fields': fields } parameters = { "account_id": account_id, "level": level, "date_range": { "start_date": start_date, "end_date": end_date }, "page": 1, "page_size": 1000, "fields": [ ] } parameters.update(common_parameters) for k in parameters: if type(parameters[k]) is not str: parameters[k] = json.dumps(parameters[k]) while True: r = requests.get(url, params=parameters) if r.status_code == 200: break else: time.sleep(1) logging.info("请求出错 等待1s..") return r.json() def daily_qq_reports_get(access_token, account_id, compaign_id, level, start_date, end_date, fields): # 获取gdt投放计划日报数据 interface = 'daily_reports/get' url = 'https://api.e.qq.com/v1.1/' + interface common_parameters = { 'access_token': access_token, 'timestamp': int(time.time()), 'nonce': str(time.time()) + str(random.randint(0, 999999)), 'fields': fields } parameters = { "account_id": account_id, "filtering": [ { "field": "campaign_id", "operator": "EQUALS", "values": [ compaign_id ] } ], "level": level, "date_range": { "start_date": start_date, "end_date": end_date }, "page": 1, "page_size": 1000, "fields": [ ] } parameters.update(common_parameters) for k in parameters: if type(parameters[k]) is not str: parameters[k] = json.dumps(parameters[k]) r = requests.get(url, params=parameters) return r.json() def mysql_insert_adcreative(data): db = pymysql.connect('rm-bp1c9cj79872tx3aaro.mysql.rds.aliyuncs.com', 'superc', 'Cc719199895', 'quchen_text') cursor = db.cursor() sql = 'replace into adcreative (campaign_id,adcreative_id,adcreative_name,image_id,title,promoted_object_type,page_type,page_id,link_page_id,promoted_object_id) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)' try: cursor.executemany(sql, data) db.commit() logging.info('insert [adcreative] ' + str(len(data))) except: db.rollback() logging.info('insert [adcreative] defeat') if __name__ == '__main__': account_id = 19206910 access_token = '89079ccc8db047b078a0108e36a7e276' # account_id2 = 14709511 access_token2 = 'e87f7b6f860eaeef086ddcc9c3614678' get_ad_cost_day(account_id, access_token, 'MP', '2021-04-09', '2021-04-09') # get_adcreatives(account_id,access_token,'MP','3187867673','2021-04-09')