#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Fri Jun 5 17:00:45 2020 @author: c """ import json import random import requests import time from datetime import datetime import pymysql import token_list as tl from concurrent.futures import ThreadPoolExecutor from util import date_util def adcreatives_get(access_token,account_id,fields) : #获取创意 interface = 'adcreatives/get' url = 'https://api.e.qq.com/v1.1/' + interface page =1 list1 = [] common_parameters = { 'access_token': access_token, 'timestamp': int(time.time()), 'nonce': str(time.time()) + str(random.randint(0, 999999)), 'fields':fields } parameters = { "account_id": account_id, "page": page, "page_size": 100, "is_deleted": False } parameters.update(common_parameters) for k in parameters: if type(parameters[k]) is not str: parameters[k] = json.dumps(parameters[k]) while True: h = requests.get(url, params=parameters) if h.status_code==200: r=h.json() break else: time.sleep(1) print("爬取失败 等待1s") if 'data' in r.keys(): list1 = list1+r['data']['list'] total_page=r['data']['page_info']['total_page'] if total_page>1: for page in range(2,total_page+1): common_parameters = { 'access_token': access_token, 'timestamp': int(time.time()), 'nonce': str(time.time()) + str(random.randint(0, 999999)), 'fields':fields } parameters = { "account_id": account_id, "page": page, "page_size": 100, "is_deleted": False } parameters.update(common_parameters) for k in parameters: if type(parameters[k]) is not str: parameters[k] = json.dumps(parameters[k]) r = requests.get(url, params = parameters).json() if 'data' in r.keys(): list1 = list1+r['data']['list'] return list1 def ads_get(access_token,account_id,fields) : #获取广告 interface = 'ads/get' url = 'https://api.e.qq.com/v1.1/' + interface common_parameters = { 'access_token': access_token, 'timestamp': int(time.time()), 'nonce': str(time.time()) + str(random.randint(0, 999999)), 'fields':fields } parameters = { "account_id": account_id, "page": 1, "page_size": 10, "is_deleted": False } parameters.update(common_parameters) for k in parameters: if type(parameters[k]) is not str: parameters[k] = json.dumps(parameters[k]) r = requests.get(url, params = parameters) return r.json() def wechat_pages_get(access_token,account_id,page_id,fields) : #获取微信原生页 interface = 'wechat_pages/get' url = 'https://api.e.qq.com/v1.1/' + interface common_parameters = { 'access_token': access_token, 'timestamp': int(time.time()), 'nonce': str(time.time()) + str(random.randint(0, 999999)), 'fields':fields } parameters = { "account_id": account_id, "filtering": [ { "field": "page_id", "operator": "EQUALS", "values": [ page_id ] } ], "page": 1, "page_size": 10 } parameters.update(common_parameters) for k in parameters: if type(parameters[k]) is not str: parameters[k] = json.dumps(parameters[k]) r = requests.get(url, params = parameters) return r.json() #print(wechat_pages_get('2a674bef201314d338be30420369671f',14985162,1900495593,('page_id','page_name','created_time','last_modified_time','page_template_id','preview_url','page_type','source_type'))) def adgroups_get(access_token,account_id,fields) : #获取广告组 interface = 'adgroups/get' url = 'https://api.e.qq.com/v1.1/' + interface common_parameters = { 'access_token': access_token, 'timestamp': int(time.time()), 'nonce': str(time.time()) + str(random.randint(0, 999999)), 'fields':fields } parameters = { "account_id": account_id, "page": 4, "page_size": 100, "is_deleted": False } parameters.update(common_parameters) for k in parameters: if type(parameters[k]) is not str: parameters[k] = json.dumps(parameters[k]) r = requests.get(url, params = parameters) return r.json() #print(adgroups_get('2a674bef201314d338be30420369671f',14985162,('campaign_id','adgroup_id','adgroup_name','optimization_goal','billing_event','bid_amount','daily_budget','targeting','begin_date','end_date','time_series','bid_strategy','cold_start_audience','auto_audience','expand_enabled','expand_targeting','deep_conversion_spec','deep_optimization_action_type','conversion_id','deep_conversion_behavior_bid','deep_conversion_worth_rate','system_status'))) def images_get(access_token,account_id,li) : #获取图片信息 import json import random import requests import time fields=('image_id','preview_url') interface = 'images/get' url = 'https://api.e.qq.com/v1.1/' + interface page = 1 list1 = [] common_parameters = { 'access_token': access_token, 'timestamp': int(time.time()), 'nonce': str(time.time()) + str(random.randint(0, 999999)), 'fields':fields } parameters = { "account_id": account_id, "page": page, "page_size": 100 } parameters.update(common_parameters) for k in parameters: if type(parameters[k]) is not str: parameters[k] = json.dumps(parameters[k]) while True: h = requests.get(url, params = parameters) if h.status_code==200: r=h.json() break else: time.sleep(1) print("请求出错 等待1s..") if 'data' in r.keys(): list1 = list1+r['data']['list'] total_page=r['data']['page_info']['total_page'] if total_page>1: for page in range(2,total_page+1): common_parameters = { 'access_token': access_token, 'timestamp': int(time.time()), 'nonce': str(time.time()) + str(random.randint(0, 999999)), 'fields':fields} parameters = { "account_id": account_id, "page": page, "page_size": 100} parameters.update(common_parameters) for k in parameters: if type(parameters[k]) is not str: parameters[k] = json.dumps(parameters[k]) r = requests.get(url, params = parameters).json() if 'data' in r.keys(): list1 = list1+r['data']['list'] for x in list1: x["account_id"]=account_id li.append(tuple(x.values())) def campaigns_get(access_token,account_id,fields) : #获取推广计划 import json import random import requests import time interface = 'campaigns/get' url = 'https://api.e.qq.com/v1.1/' + interface page = 1 list1 = [] common_parameters = { 'access_token': access_token, 'timestamp': int(time.time()), 'nonce': str(time.time()) + str(random.randint(0, 999999)), 'fields':fields } parameters = { "account_id": account_id, "page": page, "page_size": 100, "is_deleted": False } parameters.update(common_parameters) for k in parameters: if type(parameters[k]) is not str: parameters[k] = json.dumps(parameters[k]) r = requests.get(url, params = parameters).json() if 'data' in r.keys(): list1 = list1+r['data']['list'] total_page=r['data']['page_info']['total_page'] if total_page>1: for page in range(2,total_page+1): common_parameters = { 'access_token': access_token, 'timestamp': int(time.time()), 'nonce': str(time.time()) + str(random.randint(0, 999999)), 'fields':fields } parameters = { "account_id": account_id, "page": page, "page_size": 100, "is_deleted": False } parameters.update(common_parameters) for k in parameters: if type(parameters[k]) is not str: parameters[k] = json.dumps(parameters[k]) r = requests.get(url, params = parameters).json() if 'data' in r.keys(): list1 = list1+r['data']['list'] return list1 #aa=tl.token_list_vx[-2] #print(campaigns_get(aa[2],aa[0],('campaign_id','campaign_name','configured_status','campaign_type','promoted_object_type','daily_budget','budget_reach_date','created_time','last_modified_time','speed_mode','is_deleted'))) def daily_reports_get(access_token,account_id,level,start_date,end_date,fields) : #获取wx投放计划日报数据 interface = 'daily_reports/get' url = 'https://api.e.qq.com/v1.1/' + interface common_parameters = { 'access_token': access_token, 'timestamp': int(time.time()), 'nonce': str(time.time()) + str(random.randint(0, 999999)), 'fields':fields } parameters = { "account_id": account_id, "level": level, "date_range": { "start_date": start_date, "end_date": end_date }, "page": 1, "page_size": 1000, "fields": [ ] } parameters.update(common_parameters) for k in parameters: if type(parameters[k]) is not str: parameters[k] = json.dumps(parameters[k]) while True: r = requests.get(url, params = parameters) if r.status_code==200: break else: time.sleep(1) print("请求出错 等待1s..") return r.json() def daily_qq_reports_get(access_token,account_id,compaign_id,level,start_date,end_date,fields) : #获取gdt投放计划日报数据 interface = 'daily_reports/get' url = 'https://api.e.qq.com/v1.1/' + interface common_parameters = { 'access_token': access_token, 'timestamp': int(time.time()), 'nonce': str(time.time()) + str(random.randint(0, 999999)), 'fields':fields } parameters = { "account_id": account_id, "filtering": [ { "field": "campaign_id", "operator": "EQUALS", "values": [ compaign_id ] } ], "level": level, "date_range": { "start_date": start_date, "end_date": end_date }, "page": 1, "page_size": 1000, "fields": [ ] } parameters.update(common_parameters) for k in parameters: if type(parameters[k]) is not str: parameters[k] = json.dumps(parameters[k]) r = requests.get(url, params = parameters) return r.json() def mysql_insert_daily_vx_campaign(data): db = pymysql.connect('rm-bp1c9cj79872tx3aaro.mysql.rds.aliyuncs.com','superc','Cc719199895','quchen_text') cursor = db.cursor() sql = 'replace into daily_vx_campaign (account_id,date,campaign_id,view_count,cost,ctr,cpc,order_roi,thousand_display_price,valid_click_count,official_account_follow_count,conversions_count,official_account_follow_rate,conversions_rate,order_count,order_rate,order_unit_price,first_day_order_amount) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);' try: cursor.executemany(sql,data) db.commit() print('insert [daily_vx_campaign]',len(data)) except: db.rollback() print('insert [daily_vx_campaign] defeat') def mysql_insert_campaign_vx(data): db = pymysql.connect('rm-bp1c9cj79872tx3aaro.mysql.rds.aliyuncs.com','superc','Cc719199895','quchen_text') cursor = db.cursor() sql = 'replace into campaign_vx (campaign_id,campaign_name,configured_status,campaign_type,promoted_object_type,daily_budget,created_time,last_modified_time,account_id) values (%s,%s,%s,%s,%s,%s,%s,%s,%s)' try: cursor.executemany(sql,data) db.commit() print('insert [campaign_vx] ',len(data)) except: db.rollback() print('insert [campaign_vx] defeat') def mysql_insert_adcreative(data): db = pymysql.connect('rm-bp1c9cj79872tx3aaro.mysql.rds.aliyuncs.com','superc','Cc719199895','quchen_text') cursor = db.cursor() sql = 'replace into adcreative (campaign_id,adcreative_id,adcreative_name,image_id,title,promoted_object_type,page_type,page_id,link_page_id,promoted_object_id) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)' try: cursor.executemany(sql,data) db.commit() print('insert [adcreative] ',len(data)) except: db.rollback() print('insert [adcreative] defeat') def mysql_insert_image(data): db = pymysql.connect('rm-bp1c9cj79872tx3aaro.mysql.rds.aliyuncs.com','superc','Cc719199895','quchen_text') cursor = db.cursor() sql = 'replace into image (image_id,preview_url,account_id) values (%s,%s,%s)' try: cursor.executemany(sql,data) db.commit() except: db.rollback() print('insert image defeat') def v_data(x,st,et,q,r,p): account_id = x[0] access_token = x[2] start_date = time.strftime("%Y-%m-%d", time.localtime(st)) end_date = time.strftime("%Y-%m-%d", time.localtime(et)) # 获取推广计划 l = campaigns_get(access_token, account_id, ( 'campaign_id', 'campaign_name', 'configured_status', 'campaign_type', 'promoted_object_type', 'daily_budget', 'budget_reach_date', 'created_time', 'last_modified_time', 'speed_mode', 'is_deleted')) if len(l) > 0: for ll in l: ll['account_id'] = account_id if ll['created_time'] > st or ll['last_modified_time'] > st: q.append(ll['campaign_id']) lt = tuple(ll.values()) p.append(lt) data_list = daily_reports_get(access_token, account_id, 'REPORT_LEVEL_CAMPAIGN_WECHAT', start_date, end_date, ( 'account_id', 'date', 'campaign_id', 'view_count', 'cost', 'ctr', 'cpc', 'order_roi', 'thousand_display_price', 'valid_click_count', 'official_account_follow_count', 'conversions_count', 'official_account_follow_rate', 'conversions_rate', 'order_count', 'order_rate', 'order_unit_price', 'first_day_order_amount')) if 'data' in data_list.keys(): for y in data_list['data']['list']: y['account_id'] = account_id y = tuple(y.values()) r.append(y) def get_daily_vx_campaign(st,et): #获取投放计划、日报数据 token_list_v = tl.token_list_vx r = [] p = [] q=[] t1=time.time() executor = ThreadPoolExecutor(max_workers=max_workers) for x in token_list_v: executor.submit(v_data,x,st,et,q,r,p) executor.shutdown() print("获取投放日报数据 {} 广告推广{} cost:{}s".format(len(r),len(p),int(time.time()-t1))) mysql_insert_daily_vx_campaign(r) mysql_insert_campaign_vx(p) def get_daily_qq_campaign(st,et): token_list_q = tl.token_list_qq r=() for x in token_list_q: account_id = x[0] access_token = x[2] start_date = st end_date = et l = campaigns_get(access_token,account_id,('campaign_id','campaign_name','configured_status','campaign_type','promoted_object_type','daily_budget','budget_reach_date','created_time','last_modified_time','speed_mode','is_deleted')) for ll in l: campaign_id =ll['campaign_id'] data_list = daily_qq_reports_get(access_token,account_id,campaign_id,'REPORT_LEVEL_CAMPAIGN',start_date,end_date,('account_id','date','campaign_id','view_count','thousand_display_price','valid_click_count','ctr','cpc','cost','order_roi')) if len(data_list['data']['list'])>0: print(data_list) # print(l) if 'data' in data_list.keys(): for y in data_list['data']['list']: y = tuple(y.values()) r=r+((y),) def get_campaign_update_list(): db = pymysql.connect('rm-bp1c9cj79872tx3aaro.mysql.rds.aliyuncs.com','superc','Cc719199895','quchen_text') cursor = db.cursor() sql = 'select distinct advertiser_vx.account_id,access_token from campaign_vx left join advertiser_vx on advertiser_vx.account_id = campaign_vx.account_id where created_time>=%s or last_modified_time>=%s' data = (int((time.time()+8*3600)//86400*86400-8*3600-86400),int((time.time()+8*3600)//86400*86400-8*3600-86400)) try: cursor.execute(sql,data) db.commit() x=cursor.fetchall() except: db.rollback() a = [] if len(x)>0: for t in x: a.append(t[0]) sql2 = 'delete from adcreative where campaign_id=%s' try: cursor.executemany(sql2,a) db.commit() y=cursor.fetchall() except: db.rollback() return x def adcreative_vx_data(x,r): account_id = x[0] access_token = x[1] l = adcreatives_get(access_token, account_id, ( 'campaign_id', 'adcreative_id', 'adcreative_name', 'adcreative_elements', 'promoted_object_type', 'page_type', 'page_spec', 'link_page_spec', 'universal_link_url', 'promoted_object_id')) if len(l) > 0: for ll in l: if 'image_list' in ll['adcreative_elements'].keys(): for image_id in ll['adcreative_elements']['image_list']: a = {} a['campaign_id'] = ll['campaign_id'] a['adcreative_id'] = ll['adcreative_id'] a['adcreative_name'] = ll['adcreative_name'] a['image_id'] = image_id a['title'] = ll['adcreative_elements']['title'] a['promoted_object_type'] = ll['promoted_object_type'] a['page_type'] = ll['page_type'] if 'page_spec' in ll.keys(): if 'page_id' in ll['page_spec'].keys(): a['page_id'] = ll['page_spec']['page_id'] else: a['page_id'] = None else: a['page_id'] = None if 'link_page_spec' in ll.keys(): if 'page_id' in ll['link_page_spec'].keys(): a['link_page_id'] = ll['link_page_spec']['page_id'] else: a['link_page_id'] = None else: a['link_page_id'] = None a['promoted_object_id'] = ll['promoted_object_id'] y = tuple(a.values()) r = r + ((y),) elif 'image' in ll['adcreative_elements'].keys(): a = {} a['campaign_id'] = ll['campaign_id'] a['adcreative_id'] = ll['adcreative_id'] a['adcreative_name'] = ll['adcreative_name'] a['image_id'] = ll['adcreative_elements']['image'] if 'title' in ll['adcreative_elements']: a['title'] = ll['adcreative_elements']['title'] else: a['title'] = '' a['promoted_object_type'] = ll['promoted_object_type'] a['page_type'] = ll['page_type'] if 'page_spec' in ll.keys(): if 'page_id' in ll['page_spec'].keys(): a['page_id'] = ll['page_spec']['page_id'] else: a['page_id'] = None else: a['page_id'] = None if 'link_page_spec' in ll.keys(): if 'page_id' in ll['link_page_spec'].keys(): a['link_page_id'] = ll['link_page_spec']['page_id'] else: a['link_page_id'] = None else: a['link_page_id'] = None a['promoted_object_id'] = ll['promoted_object_id'] y = tuple(a.values()) r.append(y) def get_adcreative_vx(): token_list_vx=get_campaign_update_list() r = [] t1 = time.time() executor = ThreadPoolExecutor(max_workers=max_workers) for x in token_list_vx: executor.submit(adcreative_vx_data,x,r) executor.shutdown() print("获取广告创意{}cost:{}s".format(len(r),int(time.time()-t1))) mysql_insert_adcreative(r) def get_image_imformation(): token_list_vx = tl.token_list_vx li = [] t1=time.time() executor = ThreadPoolExecutor(max_workers=max_workers) for x in token_list_vx: account_id = x[0] access_token = x[2] executor.submit(images_get,access_token,account_id,li) executor.shutdown() print("获取图片信息{}条 cost:{}s".format(len(li),int(time.time()-t1))) mysql_insert_image(li) def start_all_job(): start_time = date_util.get_n_day(n=-1,is_timestamp=1) end_time = date_util.get_n_day(n=-1,is_timestamp=1) print("run[{0}] data".format(date_util.stamp_to_str(start_time)[:10])) print("线程数:",max_workers) print("开始获取计划日报----------") get_daily_vx_campaign(start_time,end_time) print("开始获取广告创意信息--------------") get_adcreative_vx() print("开始获取图片信息 ") get_image_imformation() print("success") if __name__ == '__main__': print("============start at " + str(datetime.today())[:19] + "===================") max_workers = 100 start_all_job()