from selenium import webdriver from selenium.webdriver import ActionChains from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait from wechat_action.create_ad_plan_idea import IdeaAction from wechat_action.human_ad import HumanAd from selenium.webdriver import ChromeOptions from selenium.webdriver.common.keys import Keys from functools import wraps from wechat_action import login_ad import logging import json import time import re import requests import random class WechatApi(): def __init__(self, log_ad): self.log_ad = log_ad self.driver = self.log_ad.driver self.human_info_list = [] def api_get_name(self): # 微信公众号相关内容获取,现在基本不使用,用get_human_info为主 def _api_get_name(self, service_name): WebDriverWait(self.driver, 100).until( lambda driver: True if service_name in self.driver.page_source and len( re.findall('g_tk=(\d+)', self.driver.page_source)) else False) cookie_dict = self.log_ad.get_cookie(self.driver) url_token = re.findall('g_tk=(\d+)', self.driver.page_source) url_token = url_token[0] # 得到公众号 wechat_names_url = 'https://a.weixin.qq.com/cgi-bin/agency/get_delivery_metrics?page=1&page_size=10&search_key=&order_by=&ascending=1&only_collect=0&g_tk={token}&_={time_}'.format( token=url_token, time_=int(time.time())) rsp = requests.get(url=wechat_names_url, cookies=cookie_dict) print(service_name, rsp.text) WebDriverWait(self.driver, 100).until( lambda driver: True if len( re.findall('g_tk=(\d+)', self.driver.page_source)) else False) url_token = re.findall('g_tk=(\d+)', self.driver.page_source) url_token = url_token[0] wechat_names_url = 'https://a.weixin.qq.com/cgi-bin/agency/get_delivery_metrics?page=1&page_size=10&search_key=&order_by=&ascending=1&only_collect=0&g_tk={token}&_={time_}'.format( token=url_token, time_=int(time.time())) cookie_dict = self.log_ad.get_cookie(self.driver) rsp = requests.get(url=wechat_names_url, cookies=cookie_dict) service_name = self.driver.find_element_by_xpath('//*[@id="root"]/div/header/div/div[3]/div/div[1]').text print(service_name, rsp.text) service_name = '' self.service_loop(_api_get_name, {'self': self, 'service_name': service_name}) def get_human_info(self, err_num=0): try: def _get_human_info(self, service_name): # 耗时一秒以内 # self.driver.get('https://a.weixin.qq.com/client') WebDriverWait(self.driver, 100).until( lambda driver: True if service_name in self.driver.page_source and len( re.findall('g_tk=(\d+)', self.driver.page_source)) else False) cookie_dict = self.log_ad.get_cookie(self.driver) url_token = re.findall('g_tk=(\d+)', self.driver.page_source)[0] # 得到各个appid wechat_names_url = 'https://a.weixin.qq.com/cgi-bin/agency/get_delivery_metrics?page=1&page_size=10&search_key=&order_by=&ascending=1&only_collect=0&g_tk={token}&_={time_}'.format( token=url_token, time_=int(time.time())) rsp = requests.get(url=wechat_names_url, cookies=cookie_dict) print(wechat_names_url) print(rsp.text) # 普通用户的需要在1s内获取到 # 得到wechat_token for i in rsp.json()['list']: wechat_wxname = i['wxname'] wechat_name = i['nickname'] wechat_id = i['appid'] wechat_tran_url = 'http://a.weixin.qq.com/cgi-bin/agency/redirect_mp?appid={wechat_id}&g_tk={token}&mgr_type=1'.format( token=url_token, wechat_id=wechat_id) session = requests.session() rsp = session.get(wechat_tran_url, cookies=cookie_dict) print(rsp.url) token_id = re.findall('token=(\d+)', rsp.url)[0] print(token_id) # 得到人群包 human_url = 'https://mp.weixin.qq.com/promotion/dmpmgr?action=readlist&page=1&page_size=100&token={wechat_token}&appid=&spid=&_={time_}'.format( wechat_token=token_id, time_=int(time.time())) print(human_url) rsp = session.get(url=human_url) print(rsp.text) res_json = rsp.json() res_json['service_name'] = service_name res_json['wechat_name'] = wechat_name res_json['wxname'] = wechat_wxname res_json['appid'] = wechat_id self.human_info_list.append(res_json) print(self.human_info_list) print(json.dumps(self.human_info_list)) time.sleep(random.uniform(3, 5)) service_name = self.driver.find_element_by_xpath('//*[@id="root"]/div/header/div/div[3]/div/div[1]').text _get_human_info(self, service_name=service_name) self.service_loop(_get_human_info, {'self': self, 'service_name': service_name}) self.log_ad.cookies_save(self.log_ad) return self.human_info_list except Exception as e: logging.error(str(e)) if err_num<3: self.human_info_list=[] return self.get_human_info(err_num=err_num + 1) def service_loop(self, function, kwargs): def click_service_change(): self.driver.switch_to.window(self.driver.window_handles[-1]) WebDriverWait(self.driver, 10).until( lambda x: self.driver.find_element_by_css_selector( '#root > div > header > div > div.CoreLayout__account-2lIr0 > div')) service_name_button = self.driver.find_element_by_css_selector( '#root > div > header > div > div.CoreLayout__account-2lIr0 > div') WebDriverWait(self.driver, 10).until( lambda x: (service_name_button.is_displayed() and service_name_button.is_enabled())) server_button = self.driver.find_element_by_css_selector( '#root > div > header > div > div.CoreLayout__account-2lIr0 > div') ActionChains(self.driver).move_to_element(server_button).click().perform() # server_button.click() time.sleep(random.uniform(0.1, 0.2)) self.driver.find_element_by_css_selector( '#root > div > div.CoreLayout__headerDropdown-3xWkD > div > div:nth-child(1) > button').click() time.sleep(random.uniform(0.1, 0.2)) # 得到所有service name service_name_all = set() service_name_used = set() self.driver.execute_script(''' window.scroll(0,1000000); ''') # time.sleep(random.uniform(3, 5)) click_service_change() # WebDriverWait(self.driver, 10).until( # lambda x: self.driver.find_element_by_xpath('//*[@id="root"]/div/span/div/main/div/div[1]/div/h3')) # self.driver.find_element_by_xpath('//*[@id="root"]/div/span/div/main/div/div[1]/div/h3').click() # time.sleep(random.uniform(0.1,0.2)) service_names = self.driver.find_elements_by_class_name('CoreLayout__headerDropdownItem-X4S98') for _ in service_names: service_name_all.add(_.text) # 第一个service_name service_name = self.driver.find_element_by_xpath('//*[@id="root"]/div/header/div/div[3]/div/div[1]').text service_name_used.add(service_name) # 循环使用完service_name为止 for i in range(len(service_name_all) - 1): service_names = self.driver.find_elements_by_xpath('//*[@class="CoreLayout__headerDropdownItem-X4S98"]/p') for _ in service_names: print(_.text) for service_num in range(len(service_names)): _ = service_names[service_num] if _.text not in service_name_used: choice_service = _ choice_service_name = _.text service_name_used.add(_.text) # self.driver.execute_script(''' # var e_one=document.getElementsByClassName('CoreLayout__headerDropdown-3xWkD')[0]; e_one.scroll(10000,100000); # e_one.scroll(0,{y_localtion}); # '''.format(y_localtion=service_num * 45)) ActionChains(self.driver).move_to_element(choice_service).click().perform() # choice_service.click() if 'service_name' in kwargs.keys(): kwargs['service_name'] = choice_service_name function(**kwargs) break # time.sleep(random.uniform(3, 5)) click_service_change() if __name__ == "__main__": pass