123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- from selenium.webdriver import ActionChains
- from selenium.webdriver.support.wait import WebDriverWait
- import logging
- import json
- import time
- import re
- import requests
- import random
- class WechatApi():
- def __init__(self, log_ad):
- self.log_ad = log_ad
- self.driver = self.log_ad.driver
- self.human_info_list = []
- def api_get_name(self):
- # 微信公众号相关内容获取,现在基本不使用,用get_human_info为主
- def _api_get_name(self, service_name):
- WebDriverWait(self.driver, 100).until(
- lambda driver: True if service_name in self.driver.page_source and len(
- re.findall('g_tk=(\d+)', self.driver.page_source)) else False)
- cookie_dict = self.log_ad.get_cookie(self.driver)
- url_token = re.findall('g_tk=(\d+)', self.driver.page_source)
- url_token = url_token[0]
- # 得到公众号
- wechat_names_url = 'https://a.weixin.qq.com/cgi-bin/agency/get_delivery_metrics?page=1&page_size=10&search_key=&order_by=&ascending=1&only_collect=0&g_tk={token}&_={time_}'.format(
- token=url_token, time_=int(time.time()))
- rsp = requests.get(url=wechat_names_url, cookies=cookie_dict)
- print(service_name, rsp.text)
- WebDriverWait(self.driver, 100).until(
- lambda driver: True if len(
- re.findall('g_tk=(\d+)', self.driver.page_source)) else False)
- url_token = re.findall('g_tk=(\d+)', self.driver.page_source)
- url_token = url_token[0]
- wechat_names_url = 'https://a.weixin.qq.com/cgi-bin/agency/get_delivery_metrics?page=1&page_size=10&search_key=&order_by=&ascending=1&only_collect=0&g_tk={token}&_={time_}'.format(
- token=url_token, time_=int(time.time()))
- cookie_dict = self.log_ad.get_cookie(self.driver)
- rsp = requests.get(url=wechat_names_url, cookies=cookie_dict)
- service_name = self.driver.find_element_by_xpath('//*[@id="root"]/div/header/div/div[3]/div/div[1]').text
- print(service_name, rsp.text)
- service_name = ''
- self.service_loop(_api_get_name, {'self': self, 'service_name': service_name})
- def get_human_info(self, sql_session, err_num=0):
- try:
- def _get_human_info(self, service_name):
- # 耗时一秒以内
- # self.driver.get('https://a.weixin.qq.com/client')
- WebDriverWait(self.driver, 100).until(
- lambda driver: True if service_name in self.driver.page_source and len(
- re.findall('g_tk=(\d+)', self.driver.page_source)) else False)
- cookie_dict = self.log_ad.get_cookie(self.driver)
- url_token = re.findall('g_tk=(\d+)', self.driver.page_source)[0]
- # 得到各个appid
- wechat_names_url = 'https://a.weixin.qq.com/cgi-bin/agency/get_delivery_metrics?page=1&page_size=10&search_key=&order_by=&ascending=1&only_collect=0&g_tk={token}&_={time_}'.format(
- token=url_token, time_=int(time.time()))
- rsp = requests.get(url=wechat_names_url, cookies=cookie_dict)
- print(wechat_names_url)
- print(rsp.text)
- # 普通用户的需要在1s内获取到
- # 得到wechat_token
- for i in rsp.json()['list']:
- wechat_wxname = i['wxname']
- wechat_name = i['nickname']
- wechat_id = i['appid']
- wechat_tran_url = 'http://a.weixin.qq.com/cgi-bin/agency/redirect_mp?appid={wechat_id}&g_tk={token}&mgr_type=1'.format(
- token=url_token, wechat_id=wechat_id)
- session = requests.session()
- rsp = session.get(wechat_tran_url, cookies=cookie_dict)
- print(rsp.url)
- token_id = re.findall('token=(\d+)', rsp.url)[0]
- print(token_id)
- # 得到人群包
- human_url = 'https://mp.weixin.qq.com/promotion/dmpmgr?action=readlist&page=1&page_size=100&token={wechat_token}&appid=&spid=&_={time_}'.format(
- wechat_token=token_id, time_=int(time.time()))
- print(human_url)
- rsp = session.get(url=human_url)
- print(rsp.text)
- res_json = rsp.json()
- res_json['service_name'] = service_name
- res_json['wechat_name'] = wechat_name
- res_json['wxname'] = wechat_wxname
- res_json['appid'] = wechat_id
- self.human_info_list.append(res_json)
- print(self.human_info_list)
- print(json.dumps(self.human_info_list))
- time.sleep(random.uniform(3, 5))
- service_name = self.driver.find_element_by_xpath('//*[@id="root"]/div/header/div/div[3]/div/div[1]').text
- _get_human_info(self, service_name=service_name)
- self.service_loop(_get_human_info, {'self': self, 'service_name': service_name})
- self.log_ad.cookies_save(self.log_ad, sql_session)
- return {'sucess': True, 'result_info': '', "result_list": self.human_info_list}
- except Exception as e:
- logging.error(str(e))
- if err_num < 3:
- self.human_info_list = []
- return self.get_human_info(sql_session, err_num=err_num + 1)
- return {'sucess': False, 'result_info': str(e)}
- def service_loop(self, function, kwargs):
- def click_service_change():
- # self.driver.switch_to.window(self.driver.window_handles[-1])
- WebDriverWait(self.driver, 10).until(
- lambda x: self.driver.find_element_by_css_selector(
- '#root > div > header > div > div.CoreLayout__account-2lIr0 > div'))
- service_name_button = self.driver.find_element_by_css_selector(
- '#root > div > header > div > div.CoreLayout__account-2lIr0 > div')
- WebDriverWait(self.driver, 10).until(
- lambda x: (service_name_button.is_displayed() and service_name_button.is_enabled()))
- server_button = self.driver.find_element_by_css_selector(
- '#root > div > header > div > div.CoreLayout__account-2lIr0 > div')
- ActionChains(self.driver).move_to_element(server_button).click().perform()
- # server_button.click()
- time.sleep(random.uniform(0.1, 0.2))
- self.driver.find_element_by_css_selector(
- '#root > div > div.CoreLayout__headerDropdown-3xWkD > div > div:nth-child(1) > button').click()
- time.sleep(random.uniform(0.1, 0.2))
- # 得到所有service name
- service_name_all = set()
- service_name_used = set()
- self.driver.execute_script('''
- window.scroll(0,1000000);
- ''')
- # time.sleep(random.uniform(3, 5))
- click_service_change()
- # WebDriverWait(self.driver, 10).until(
- # lambda x: self.driver.find_element_by_xpath('//*[@id="root"]/div/span/div/main/div/div[1]/div/h3'))
- # self.driver.find_element_by_xpath('//*[@id="root"]/div/span/div/main/div/div[1]/div/h3').click()
- # time.sleep(random.uniform(0.1,0.2))
- service_names = self.driver.find_elements_by_class_name('CoreLayout__headerDropdownItem-X4S98')
- for _ in service_names:
- service_name_all.add(_.text)
- # 第一个service_name
- service_name = self.driver.find_element_by_xpath('//*[@id="root"]/div/header/div/div[3]/div/div[1]').text
- service_name_used.add(service_name)
- # 循环使用完service_name为止
- for i in range(len(service_name_all) - 1):
- service_names = self.driver.find_elements_by_xpath('//*[@class="CoreLayout__headerDropdownItem-X4S98"]/p')
- for _ in service_names:
- print(_.text)
- for service_num in range(len(service_names)):
- _ = service_names[service_num]
- if _.text not in service_name_used:
- choice_service = _
- choice_service_name = _.text
- service_name_used.add(_.text)
- # self.driver.execute_script('''
- # var e_one=document.getElementsByClassName('CoreLayout__headerDropdown-3xWkD')[0]; e_one.scroll(10000,100000);
- # e_one.scroll(0,{y_localtion});
- # '''.format(y_localtion=service_num * 45))
- ActionChains(self.driver).move_to_element(choice_service).click().perform()
- # choice_service.click()
- if 'service_name' in kwargs.keys():
- kwargs['service_name'] = choice_service_name
- function(**kwargs)
- break
- # time.sleep(random.uniform(3, 5))
- click_service_change()
- if __name__ == "__main__":
- pass
|