from selenium import webdriver from selenium.webdriver import ActionChains from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait from wechat_action.send_ad_idea import IdeaAction from wechat_action.human_ad import HumanAd from selenium.webdriver import ChromeOptions from selenium.webdriver.common.keys import Keys import time import re import requests import random class WechatApi(): def __init__(self): self.driver = self.get_driver() self.log_in() def get_driver(self): options = ChromeOptions() # 防止selenium快速崩坏 options.add_argument("--disable-dev-shm-usage") # driver = webdriver.Remote( # command_executor='http://192.168.7.245:4444/wd/hub', # options=options) driver = webdriver.Chrome() driver.maximize_window() return driver def log_in(self): self.driver.get('https://a.weixin.qq.com/index.html') img_selector = 'body > div.old-template > div > div > div.waiting.panelContent > div.wrp_code > img' 'body > div.old-template > div > div > div.waiting.panelContent > div.wrp_code > img' frame_login = self.driver.find_element_by_xpath('//*[@id="login_container"]/iframe') self.driver.switch_to.frame(frame_login) # WebDriverWait(driver, 3).until(EC.invisibility_of_element_located((By.CSS_SELECTOR, img_selector))) # time.sleep(3) img_url = self.driver.find_element_by_css_selector(img_selector) print('img', img_url.get_attribute('src')) WebDriverWait(self.driver, 100).until(lambda driver: self.driver.find_elements_by_link_text('广告投放')) def get_cookie(self): WebDriverWait(self.driver, 100).until( lambda x: [True for _ in self.driver.get_cookies() if 'token_ticket' == _['name']] ) cookies = self.driver.get_cookies() cookie_dict = {} for _ in cookies: print(_) cookie_dict[_['name']] = _['value'] return cookie_dict def wechat_cookie_save(self): # TODO:wechat cookie 存入数据库 pass def get_wechat_name(self): def api_get_name(service_name): WebDriverWait(self.driver, 100).until(lambda driver: self.driver.find_elements_by_link_text('广告投放')) cookie_dict = self.get_cookie() url_token = re.findall('g_tk=(\d+)', self.driver.page_source) url_token = url_token[0] # 得到公众号 wechat_names_url = 'https://a.weixin.qq.com/cgi-bin/agency/get_delivery_metrics?page=1&page_size=10&search_key=&order_by=&ascending=1&only_collect=0&g_tk={token}&_={time_}'.format( token=url_token, time_=int(time.time())) rsp = requests.get(url=wechat_names_url, cookies=cookie_dict) print(service_name, rsp.text) # 得到所有service name service_name_all = set() service_name_used = set() self.driver.execute_script(''' window.scroll(0,1000000); ''') self.driver.find_element_by_css_selector( '#root > div > header > div > div.CoreLayout__account-2lIr0 > div').click() time.sleep(random.uniform(0.1, 0.2)) self.driver.find_element_by_css_selector( '#root > div > div.CoreLayout__headerDropdown-3xWkD > div > div:nth-child(1) > button').click() time.sleep(random.uniform(0.1, 0.2)) service_names = self.driver.find_elements_by_class_name('CoreLayout__headerDropdownItem-X4S98') for _ in service_names: service_name_all.add(_.text) # 第一个service_name server_name = self.driver.find_element_by_xpath('//*[@id="root"]/div/header/div/div[3]/div/div[1]').text service_name_used.add(server_name) # 循环使用完service_name为止 for i in range(len(service_name_all)): service_names = self.driver.find_elements_by_xpath('//*[@class="CoreLayout__headerDropdownItem-X4S98"]/p') for _ in service_names: #TODO:服务商太多可能没法点击 if _.text not in service_name_used: choice_service = _ choice_service_name = _.text service_name_used.add(_.text) choice_service.click() api_get_name(choice_service_name) break self.driver.switch_to.window(self.driver.window_handles[-1]) WebDriverWait(self.driver, 10).until( lambda x: self.driver.find_element_by_css_selector( '#root > div > header > div > div.CoreLayout__account-2lIr0 > div')) service_name_button = self.driver.find_element_by_css_selector( '#root > div > header > div > div.CoreLayout__account-2lIr0 > div') WebDriverWait(self.driver, 10).until( lambda x: (service_name_button.is_displayed() and service_name_button.is_enabled())) #TODO:需要查看为什么无法点击 self.driver.find_element_by_css_selector( '#root > div > header > div > div.CoreLayout__account-2lIr0 > div').click() time.sleep(random.uniform(0.1, 0.2)) self.driver.find_element_by_css_selector( '#root > div > div.CoreLayout__headerDropdown-3xWkD > div > div:nth-child(1) > button').click() time.sleep(random.uniform(0.1, 0.2)) def get_human_info(self): # TODO:多服务商 # 耗时一秒以内 self.driver.get('https://a.weixin.qq.com/client') WebDriverWait(self.driver, 100).until(lambda driver: driver.find_elements_by_link_text('广告投放')) cookie_dict = self.get_cookie() url_token = re.findall('g_tk=(\d+)', self.driver.page_source) url_token = url_token[0] # 得到各个appid wechat_names_url = 'https://a.weixin.qq.com/cgi-bin/agency/get_delivery_metrics?page=1&page_size=10&search_key=&order_by=&ascending=1&only_collect=0&g_tk={token}&_={time_}'.format( token=url_token, time_=int(time.time())) rsp = requests.get(url=wechat_names_url, cookies=cookie_dict) wechat_id_list = [] for i in rsp.json()['list']: wechat_id_list.append(i['appid']) print(rsp.text) # 普通用户的需要在1s内获取到 # 得到wechat_token time_one = time.time() for _ in range(25): for i in wechat_id_list: wechat_id = i wechat_tran_url = 'http://a.weixin.qq.com/cgi-bin/agency/redirect_mp?appid={wechat_id}&g_tk={token}&mgr_type=1'.format( token=url_token, wechat_id=wechat_id) session = requests.session() rsp = session.get(wechat_tran_url, cookies=cookie_dict) # print(rsp.url) token_id = re.findall('token=(\d+)', rsp.url)[0] print(token_id) # 得到人群包 human_url = 'https://mp.weixin.qq.com/promotion/dmpmgr?action=readlist&page=1&page_size=100&token={wechat_token}&appid=&spid=&_={time_}'.format( wechat_token=token_id, time_=int(time.time())) print(human_url) rsp = session.get(url=human_url) print(rsp.text) time_two = time.time() print('耗时', time_two - time_one) if __name__ == "__main__": pass