123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 |
- from selenium import webdriver
- from selenium.webdriver import ActionChains
- from selenium.webdriver.common.by import By
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.wait import WebDriverWait
- from wechat_action.create_ad_plan_idea import IdeaAction
- from wechat_action.human_ad import HumanAd
- from selenium.webdriver import ChromeOptions
- from selenium.webdriver.common.keys import Keys
- from sqlalchemy import Table
- from wechat_action import sql_tools
- import time
- import pickle
- from settings import using_config
- from wechat_action.sql_models import DB
- import requests
- import json
- import logging
- import re
- class LogIn:
- # TODO:整体运行使用逻辑,需要修改几次
- def __init__(self, user_id):
- # 获取到单独服务商下的独立公众号页面
- self.user_id = user_id
- self.driver = self.get_driver()
- self.db = DB(config=using_config)
- self.wechat_cookies_table = Table('wechat_cookies', self.db.metadata,
- autoload=True, autoload_with=self.db.engine)
- # self.log_in()
- # self.select_ad_master()
- def get_driver(self):
- options = ChromeOptions()
- # 防止selenium快速崩坏
- options.add_argument("--disable-dev-shm-usage")
- options.add_experimental_option('excludeSwitches', ['enable-automation'])
- prefs = {"profile.managed_default_content_settings.images": 2, 'permissions.default.stylesheet': 2}
- options.add_experimental_option("prefs", prefs)
- driver = webdriver.Remote(
- # command_executor='http://192.168.1.100/wd/hub',
- command_executor='http://118.31.53.105:4555/wd/hub',
- options=options)
- # driver = webdriver.Chrome(options=options)
- driver.maximize_window()
- return driver
- def log_in(self):
- # self.wechat_cookie_use()
- logging.info('开始登录')
- self.driver.get('https://a.weixin.qq.com/index.html')
- img_selector = 'body > div.old-template > div > div > div.waiting.panelContent > div.wrp_code > img'
- frame_login = self.driver.find_element_by_xpath('//*[@id="login_container"]/iframe')
- self.driver.switch_to.frame(frame_login)
- # WebDriverWait(driver, 3).until(EC.invisibility_of_element_located((By.CSS_SELECTOR, img_selector)))
- # time.sleep(3)
- img_url = self.driver.find_element_by_css_selector(img_selector)
- return img_url.get_attribute('src')
- def log_in_wait(self):
- # 默认等待6分钟
- WebDriverWait(self.driver, 6 * 60).until(lambda driver: self.driver.find_elements_by_link_text('广告投放'))
- logging.info('登录成功')
- @staticmethod
- def cookies_save(log_ad, sql_session):
- logging.info('update db cookie')
- # 切换窗口,点击创建广告,切到广告页面
- log_ad.driver.switch_to.window(log_ad.driver.window_handles[0])
- WebDriverWait(log_ad.driver, 100).until(lambda driver: driver.find_element_by_class_name(
- 'ui-mr-medium'))
- wechat_cookies, wechat_id_tmp = log_ad.wechat_cookie_pickle()
- wechat_id = sql_tools.get_wechat_id_from_cookies(log_ad.user_id, sql_session)
- if wechat_id:
- if wechat_id_tmp != wechat_id:
- raise ValueError("微信账号,非以前老账号,登录失败")
- # cookie 进行数据库保存
- update_res = log_ad.wechat_cookies_table.update() \
- .where(log_ad.wechat_cookies_table.c.user_id == log_ad.user_id) \
- .values(cookies=wechat_cookies,
- scan_action='done')
- sql_session.execute(update_res)
- sql_session.commit()
- else:
- wechat_cookies_info = {'user_id': log_ad.user_id, 'cookies': wechat_cookies, 'wechat_id': wechat_id,
- 'scan_action': 'done'}
- wechat_insert = sql_tools.save_wechat_cookies(wechat_cookies_info=wechat_cookies_info,
- table_wechat_cookies=log_ad.wechat_cookies_table)
- sql_session.execute(wechat_insert)
- sql_session.commit()
- log_ad.driver.switch_to.window(log_ad.driver.window_handles[-1])
- def select_ad_master(self, service_name, wechat_name, sql_session):
- logging.info('开始切换服务商')
- WebDriverWait(self.driver, 100).until(
- lambda driver: self.driver.find_element_by_xpath('//*[@id="root"]/div/header/div/div[3]/div/div[1]'))
- tmp_service_name = self.driver.find_element_by_xpath('//*[@id="root"]/div/header/div/div[3]/div/div[1]')
- if service_name != tmp_service_name.text:
- self.driver.execute_script('''
- window.scroll(0,1000000);
- ''')
- self.driver.find_element_by_css_selector(
- '#root > div > header > div > div.CoreLayout__account-2lIr0 > div').click()
- self.driver.find_element_by_css_selector(
- '#root > div > div.CoreLayout__headerDropdown-3xWkD > div > div:nth-child(1) > button').click()
- service_names = self.driver.find_elements_by_class_name('CoreLayout__headerDropdownItem-X4S98')
- choice_service = None
- for _ in service_names:
- if service_name in _.text:
- choice_service = _
- choice_service.click()
- # 挑选广告投放位置
- WebDriverWait(self.driver, 100).until(
- lambda driver: self.driver.find_element_by_xpath(
- '//*[@id="root"]/div/header/div/div[3]/div/div[1]').text == service_name)
- input_wechat_name = self.driver.find_element_by_class_name('TextInput_new__iconRight-pekjS')
- input_wechat_name.click()
- input_wechat_name.send_keys(wechat_name)
- input_wechat_name.send_keys(Keys.RETURN)
- self.driver.execute_script('''
- window.scroll(100000,1000000);
- var e_one=document.getElementsByClassName('Table_new__wrapper-1cpZN')[0];
- e_one.scroll(10000,100000);
- ''')
- time.sleep(0.1)
- #TODO:去除掉time.sleep
- time.sleep(5)
- WebDriverWait(self.driver, 100).until(
- lambda driver: len([self.driver.find_elements_by_link_text('广告投放')]) == 1)
- WebDriverWait(self.driver, 100).until(
- lambda driver: len([_ for _ in self.driver.find_elements_by_link_text('广告投放') if
- _.is_enabled() and _.is_displayed()]) == 1)
- elements = self.driver.find_elements_by_link_text('广告投放')
- elements[0].click()
- WebDriverWait(self.driver, 100).until(lambda driver: len(self.driver.window_handles) > 1)
- logging.info('切换服务商成功')
- self.cookies_save(self, sql_session)
- @staticmethod
- def get_cookie(driver, login_cookie=True):
- if login_cookie:
- WebDriverWait(driver, 100).until(
- lambda x: [True for _ in driver.get_cookies() if 'token_ticket' == _['name']]
- )
- cookies = driver.get_cookies()
- cookie_dict = {}
- for _ in cookies:
- cookie_dict[_['name']] = _['value']
- return cookie_dict
- def wechat_cookie_pickle(self):
- wechat_id = None
- self.driver.get('https://a.weixin.qq.com/client')
- WebDriverWait(self.driver, 100).until(
- lambda x: [True for _ in self.driver.get_cookies() if 'token_ticket' == _['name']]
- )
- WebDriverWait(self.driver, 100).until(
- lambda x: re.findall('g_tk=(\d+)&', self.driver.page_source)
- )
- cookies = self.driver.get_cookies()
- cookies_obj = pickle.dumps(cookies)
- g_tk = re.findall('g_tk=(\d+)&', self.driver.page_source)[0]
- wechat_cookies = self.get_cookie(self.driver)
- wechat_url = 'https://a.weixin.qq.com/cgi-bin/agency/check_login?g_tk={g_tk}&_={time_p}'.format(g_tk=g_tk,
- time_p=int(
- time.time() * 1000))
- rsp = requests.get(url=wechat_url, cookies=wechat_cookies)
- wechat_id = rsp.json()['data'][0]['wx_id']
- return cookies_obj, wechat_id
- def wechat_cookies_check_alive(self, driver_cookies):
- # wechat 检查cookies 是否可用
- # 可用返回ture
- check_url = 'https://a.weixin.qq.com/cgi-bin/agency/get_delivery_metrics?page=1&page_size=10&search_key=&order_by=&ascending=1&only_collect=0&g_tk=5381&_={}'.format(
- int(time.time() * 1000))
- # cookie_dict = {}
- # for _ in driver_cookies:
- # cookie_dict[_['name']] = _['value']
- # rsp_json = requests.get(url=check_url, cookies=cookie_dict).json()
- self.driver.get('https://www.baidu.com')
- self.driver.get('https://a.weixin.qq.com/client')
- for _ in driver_cookies:
- self.driver.add_cookie(_)
- self.driver.get('https://a.weixin.qq.com/client')
- self.driver.get(check_url)
- if '4101' in self.driver.page_source:
- return False
- else:
- return True
- def upadte_user_info(self):
- # TODO: 更新 用户相关信息
- # 每次登录就更新一次相关数据,------公众号相关数据,人群报相关数据
- pass
- def get_driver_loged(self):
- return self.driver
- def refresh_driver(self):
- err_num = 0
- logging.info('开始刷新chrome')
- while True:
- try:
- if len(self.driver.window_handles) > 1:
- self.driver.switch_to.window(self.driver.window_handles[-1])
- self.driver.execute_script('window.close();')
- time.sleep(1)
- # 规避有弹窗的情况
- self.driver.switch_to.alert.accept()
- else:
- self.driver.switch_to.window(self.driver.window_handles[-1])
- self.driver.get('https://a.weixin.qq.com')
- break
- except Exception as e:
- logging.error(e)
- err_num = err_num + 1
- if err_num > 3:
- break
- logging.info('刷新chrome 结束')
|