from wechat_action.sql_models import DB from settings import using_config import tornado.log import tornado.ioloop import tornado.web from logging import handlers from wechat_action.login_ad import LogIn from wechat_action import sql_tools import threading from web_module import user_action from sqlalchemy import Table import json import pickle from datetime import datetime # TODO:需要添加上supervisor,来维护进程 # TODO:有时间需要对tornado进行改进 # TODO:需要有一套上线工具,来维持线上稳定 db = DB(config=using_config) wechat_cookies_table = Table('wechat_cookies', db.metadata, autoload=True, autoload_with=db.engine) layout_typesetting_table = Table('layout_typesetting', db.metadata, autoload=True, autoload_with=db.engine) ad_plan_typesetting_table = Table('ad_plan_typesetting', db.metadata, autoload=True, autoload_with=db.engine) action_record_table = Table('action_record', db.metadata, autoload=True, autoload_with=db.engine) layout_create_action = 'create_ad_layout' ad_plan_create_action = 'create_ad_plan' refresh_wechat_action = 'refresh_wechat_info' # 1.实现本机服务 # 2.实现线上docker-selenium服务 class BaseHandler(tornado.web.RequestHandler): def options(self): pass def set_default_headers(self): self.set_header('Access-Control-Allow-Origin', '*') self.set_header('Access-Control-Allow-Headers', '*') self.set_header('Access-Control-Max-Age', 1000) self.set_header('Content-type', '*') self.set_header('Access-Control-Allow-Methods', '*') class create_ad_plan_local(BaseHandler): def post(self): request_dict = json.loads(self.request.body, encoding='utf-8') user_id = request_dict['user_id'] ad_plan_list = request_dict['plan_list'] sql_session = db.DBSession() if user_id is None or ad_plan_list is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 落地页名字精确到毫秒,默认是全局唯一 for _ in ad_plan_list: ad_plan_name = _['title'] ad_plan_typesetting_info = {'user_id': user_id, 'name': ad_plan_name, 'typesetting': json.dumps(_, ensure_ascii=False)} ad_plan_typesetting_inserte = sql_tools.save_ad_plan_typesetting_info( ad_plan_typesetting_info=ad_plan_typesetting_info, table_ad_plan_typesetting=ad_plan_typesetting_table) sql_session.execute(ad_plan_typesetting_inserte) sql_session.commit() self.write({'status': {'msg': 'success', "RetCode": 200}}) class create_ad_plan(BaseHandler): # TODO:只要tornado开着就不允许修改数据库,------想好之后上线如何操作 @staticmethod def check_task(user_id): sql_session = db.DBSession() result = sql_tools.get_task_in_hand_num(user_id, sql_session) return result def save_task_info(self, user_id, ad_plan_list, sql_session, task_name): # 2.数据存入数据库 if user_id is None or ad_plan_list is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 2.1存计划数据 for _ in ad_plan_list: ad_plan_name = _['title'] ad_plan_typesetting_info = {'user_id': user_id, 'name': ad_plan_name, 'typesetting': json.dumps(_, ensure_ascii=False)} ad_plan_typesetting_inserte = sql_tools.save_ad_plan_typesetting_info( ad_plan_typesetting_info=ad_plan_typesetting_info, table_ad_plan_typesetting=ad_plan_typesetting_table) sql_session.execute(ad_plan_typesetting_inserte) sql_session.commit() for _ in ad_plan_list: for action_type in [layout_create_action, ad_plan_create_action]: object_name = _['title'] if action_type == ad_plan_create_action else \ _['idea']['jump_type_page_type'][ 'layout_name'] action_info = {'user_id': user_id, 'service_name': _['service_name'], 'wechat_name': _['wechat_name'], 'action_type': action_type, 'object_name': object_name, 'task_name': task_name, 'status': 'todo'} record_insert = sql_tools.save_action_record(action_record_info=action_info, table_action_record=action_record_table) sql_session.execute(record_insert) sql_session.commit() def post(self): sql_session = db.DBSession() log_ad = None try: request_dict = json.loads(self.request.body, encoding='utf-8') ad_plan_list = request_dict['planList'] user_id = request_dict['userId'] # 2.2存行为记录 task_name = 'user_id: {user_id} time:{time_sign} action:create_plan'.format(user_id=user_id, time_sign=datetime.now().strftime( "%Y-%m-%d, %H:%M:%S")) # 4.开始运行 if not self.check_task(user_id=user_id): # 1.查看是否cookie可用 log_ad, cookie_canuse = ad_human_info.refresh_wechat_cookies(self, user_id=user_id) self.save_task_info(user_id, ad_plan_list, sql_session, task_name) threading.Thread(target=user_action.carry_plan, args=(user_id, ad_plan_list, log_ad, db, cookie_canuse, task_name)).start() else: self.save_task_info(user_id, ad_plan_list, sql_session, task_name) self.write({'status': {'msg': 'success', "RetCode": 200}}) except Exception as e: if log_ad: log_ad.driver.quit() logging.error(str(e)) # TODO:这部分需要处理,,之后换成flask self.write('error') finally: sql_session.commit() class get_ad_plan_local(BaseHandler): def get(self): user_id = self.get_argument('user_id', None) layout_name = self.get_argument('plan_name', None) sql_session = db.DBSession() if user_id is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 落地页名字精确到毫秒,默认是全局唯一 if layout_name: result = sql_tools.get_plan_typesetting_rough(sql_session=sql_session, user_id=user_id, typesetting_name=layout_name) else: # TODO:之后修改一下,让其查询效率高点,like效率过低 layout_name = '' result = sql_tools.get_plan_typesetting_rough(sql_session=sql_session, user_id=user_id, typesetting_name=layout_name) result_ = [] for i in range(len(result)): typesetting, name, create_time, update_time = result[i] _ = {} _['typesetting'] = json.loads(typesetting) _['ad_plan_name'] = name _['id'] = i _['create_time'] = create_time.strftime("%Y-%m-%d %H:%M:%S") _['update_time'] = update_time.strftime("%Y-%m-%d %H:%M:%S") result_.append(_) self.write({'status': {'msg': 'success', "RetCode": 200}, 'local_ad_plan_info': result_}) class create_ad_layout_local(BaseHandler): def post(self): # TODO:返回一个layout_name重复的一个信息 request_dict = json.loads(self.request.body) user_id = request_dict['user_id'] layout_typesetting = request_dict['layout_typesetting'] layout_name = request_dict['layout_name'] sql_session = db.DBSession() if user_id is None or layout_name is None or layout_typesetting is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 落地页名字精确到毫秒,默认是全局唯一 layout_typesetting_info = {'user_id': user_id, 'name': layout_name, 'typesetting': layout_typesetting} layout_typesetting_inserte = sql_tools.save_layout_typesetting_info( layout_typesetting_info=layout_typesetting_info, table_layout_typesetting=layout_typesetting_table) sql_session.execute(layout_typesetting_inserte) sql_session.commit() self.write({'status': {'msg': 'success', "RetCode": 200}}) class get_ad_layout_local(BaseHandler): def get(self): user_id = self.get_argument('user_id', None) layout_name = self.get_argument('layout_name', None) sql_session = db.DBSession() if user_id is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 落地页名字精确到毫秒,默认是全局唯一 if layout_name: result = sql_tools.get_layout_typesetting_rough(sql_session=sql_session, user_id=user_id, typesetting_name=layout_name) else: # TODO:之后修改一下,让其查询效率高点,like效率过低 layout_name = '' result = sql_tools.get_layout_typesetting_rough(sql_session=sql_session, user_id=user_id, typesetting_name=layout_name) result_ = [] for i in range(len(result)): typesetting, name, create_time, update_time = result[i] _ = {} _['typesetting'] = json.loads(typesetting) _['layout_name'] = name _['id'] = i _['create_time'] = create_time.strftime("%Y-%m-%d %H:%M:%S") _['update_time'] = update_time.strftime("%Y-%m-%d %H:%M:%S") result_.append(_) self.write({'status': {'msg': 'success', "RetCode": 200}, 'local_layout_info': result_}) class get_scan_status(BaseHandler): # 获取到扫码状态 def get(self): sql_session = db.DBSession() user_id = self.get_argument("user_id", None) status = sql_tools.get_scan_action_status(user_id, sql_session) if user_id is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return self.write({'status': {'msg': 'success', "RetCode": 200}, 'scan_action_status': status}) # TODO:wechat_info,human_info 这两张表有空时需要进行对应改进 class ad_human_info(BaseHandler): @staticmethod def refresh_wechat_cookies(tornado_web, user_id): # 1.返回二维码链接 # ----1.查看cookie是否可用 sql_session = db.DBSession() cookie_db = sql_tools.get_wechat_cookies(sql_session, user_id=user_id) # 进行登录操作 log_ad = LogIn(user_id=user_id) # 使driver可以使用 cookie_canuse = False if cookie_db: cookie_db = pickle.loads(cookie_db) if not log_ad.wechat_cookies_check_alive(cookie_db): # cookie 不能使用 wechat_code = log_ad.log_in() sql_tools.update_user_scan_action(user_id, sql_session) tornado_web.write({'status': {'msg': 'success', "RetCode": 200}, 'wechat_code': wechat_code}) logging.info('cookie can not use') else: # cookie 可以继续使用 cookie_canuse = True log_ad.driver.get('https://a.weixin.qq.com/index.html') tornado_web.write({'status': {'msg': 'success', "RetCode": 200}}) else: # cookie 不能使用 wechat_code = log_ad.log_in() sql_tools.update_user_scan_action(user_id, sql_session) tornado_web.write({'status': {'msg': 'success', "RetCode": 200}, 'wechat_code': wechat_code}) return log_ad, cookie_canuse # 1.人群包获取 def get(self): sql_session = db.DBSession() log_ad = None try: # 0.是否刷新 # 1.获取userid,以及是否刷新 user_id = self.get_argument("user_id", None) human_package_name = self.get_argument('human_package_name', None) is_refresh = self.get_argument("is_refresh", None) wechat_name = self.get_argument('wechat_name', None) service_name = self.get_argument('service_name', None) if user_id is None or is_refresh is None or wechat_name is None or service_name is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # TODO:一个涉及到selenium-driver的请求-生命周期.----看一下tornado是怎么处理请求的生命周期 if int(is_refresh) == 1: log_ad, cookie_canuse = self.refresh_wechat_cookies(self, user_id=user_id) if not create_ad_plan.check_task(user_id=user_id): task_name = 'user_id: {user_id} time:{time_sign} action:refresh_wechat_info'.format( user_id=user_id, time_sign=datetime.now().strftime( "%Y-%m-%d, %H:%M:%S")) # 行为记录 action_type = refresh_wechat_action object_name = '' service_name = '' wechat_name = '' action_info = {'user_id': user_id, 'service_name': service_name, 'wechat_name': wechat_name, 'action_type': action_type, 'object_name': object_name, 'task_name': task_name, 'status': 'todo'} record_insert = sql_tools.save_action_record(action_record_info=action_info, table_action_record=action_record_table) sql_session.execute(record_insert) sql_session.commit() threading.Thread(target=user_action.get_human_info, args=( user_id, log_ad, db, cookie_canuse, task_name)).start() else: logging.info('任务有堆积') return self.write({'status': {'msg': '任务有堆积', "RetCode": 200}}) else: # 1.查看是否在刷新, # 在刷新中, # 返回正在刷新 # -------不管上面逻辑让他们多刷新几次 # 不在刷新 # 返回对应数据 # 2.获取userid对应数据 result = sql_tools.get_human_info(sql_session=sql_session, service_name=service_name, wechat_name=wechat_name) result = json.loads(result) if human_package_name: result = [_ for _ in result if human_package_name in _['name']] result_ = [] for i in range(len(result)): _ = result[i] _['id'] = i result_.append(_) self.write({'status': {'msg': 'success', "RetCode": 200}, 'human_info': result}) except Exception as e: if log_ad: log_ad.driver.quit() logging.error(str(e)) finally: sql_session.commit() class refresh_wechat_info(BaseHandler): # TODO:刷新以及创建,限时3分钟 @staticmethod def refresh_wechat_cookies(tornado_web, user_id): # 1.返回二维码链接 # ----1.查看cookie是否可用 sql_session = db.DBSession() cookie_db = sql_tools.get_wechat_cookies(sql_session, user_id=user_id) # 进行登录操作 log_ad = LogIn(user_id=user_id) # 使driver可以使用 cookie_canuse = False if cookie_db: cookie_db = pickle.loads(cookie_db) if not log_ad.wechat_cookies_check_alive(cookie_db): # cookie 不能使用 wechat_code = log_ad.log_in() sql_tools.update_user_scan_action(user_id, sql_session) tornado_web.write({'status': {'msg': 'success', "RetCode": 200}, 'wechat_code': wechat_code}) logging.info('cookie can not use') else: # cookie 可以继续使用 cookie_canuse = True log_ad.driver.get('https://a.weixin.qq.com/index.html') tornado_web.write({'status': {'msg': 'success', "RetCode": 200}}) else: # cookie 不能使用 wechat_code = log_ad.log_in() sql_tools.update_user_scan_action(user_id, sql_session) tornado_web.write({'status': {'msg': 'success', "RetCode": 200}, 'wechat_code': wechat_code}) return log_ad, cookie_canuse def get(self): sql_session = db.DBSession() log_ad = None try: user_id = self.get_argument("user_id", None) log_ad, cookie_canuse = self.refresh_wechat_cookies(self, user_id=user_id) task_name = 'user_id: {user_id} time:{time_sign} action:refresh_wechat_info'.format( user_id=user_id, time_sign=datetime.now().strftime( "%Y-%m-%d, %H:%M:%S")) # 行为记录 action_type = refresh_wechat_action object_name = '' service_name = '' wechat_name = '' action_info = {'user_id': user_id, 'service_name': service_name, 'wechat_name': wechat_name, 'action_type': action_type, 'object_name': object_name, 'task_name': task_name, 'status': 'todo'} record_insert = sql_tools.save_action_record(action_record_info=action_info, table_action_record=action_record_table) sql_session.execute(record_insert) sql_session.commit() if not create_ad_plan.check_task(user_id=user_id): threading.Thread(target=user_action.get_human_info, args=( user_id, log_ad, db, cookie_canuse, task_name)).start() else: return self.write({'status': {'msg': '任务有堆积', "RetCode": 200}}) except: pass class ad_wechat_info(BaseHandler): # 1.公众号相关信息获取 def get(self): sql_session = db.DBSession() log_ad = None try: # 0.是否刷新 # 1.获取userid,以及是否刷新 user_id = self.get_argument("userId", None) is_refresh = self.get_argument("isRefresh", None) if user_id is None or is_refresh is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return if int(is_refresh) == 1: # 检查有无其他任务在处理中,有则等待 log_ad, cookie_canuse = ad_human_info.refresh_wechat_cookies(self, user_id=user_id) if not create_ad_plan.check_task(user_id=user_id): task_name = 'user_id: {user_id} time:{time_sign} action:refresh_wechat_info'.format( user_id=user_id, time_sign=datetime.now().strftime( "%Y-%m-%d, %H:%M:%S")) # 行为记录 action_type = refresh_wechat_action object_name = '' service_name = '' wechat_name = '' action_info = {'user_id': user_id, 'service_name': service_name, 'wechat_name': wechat_name, 'action_type': action_type, 'object_name': object_name, 'task_name': task_name, 'status': 'todo'} record_insert = sql_tools.save_action_record(action_record_info=action_info, table_action_record=action_record_table) sql_session.execute(record_insert) sql_session.commit() threading.Thread(target=user_action.get_human_info, args=( user_id, log_ad, db, cookie_canuse, task_name)).start() else: return self.write({'status': {'msg': '任务有堆积', "RetCode": 200}}) else: result = sql_tools.get_wechat_info(sql_session=sql_session, user_id=user_id) result_list = [] for _ in result: service_name, wechat_name = _ result_list.append({'service_name': service_name, 'wechat_name': wechat_name}) self.write({'status': {'msg': 'success', "RetCode": 200}, 'wechat_info': result_list}) except Exception as e: if log_ad: log_ad.driver.quit() logging.error(str(e)) finally: sql_session.commit() class delete_ad_layout(BaseHandler): def get(self): user_id = self.get_argument('user_id', None) layout_name = self.get_argument('layout_name', None) sql_session = db.DBSession() if user_id is None or layout_name is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 落地页名字精确到毫秒,默认是全局唯一 sql_tools.delete_layout_typesetting_vir(sql_session=sql_session, user_id=user_id, typesetting_name=layout_name) self.write({'status': {'msg': 'success', "RetCode": 200}}) class delete_ad_plan(BaseHandler): def get(self): user_id = self.get_argument('user_id', None) plan_name = self.get_argument('plan_name', None) service_name = self.get_argument('service_name', None) wechat_name = self.get_argument('wechat_name', None) sql_session = db.DBSession() if user_id is None or plan_name is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 落地页名字精确到毫秒,默认是全局唯一 sql_tools.delete_ad_plan_typesetting_vir(sql_session=sql_session, user_id=user_id, typesetting_name=plan_name, wechat_name=wechat_name, service_name=service_name) self.write({'status': {'msg': 'success', "RetCode": 200}}) class get_ad_wechat_service_name(BaseHandler): def get(self): user_id = self.get_argument('user_id', None) sql_session = db.DBSession() if user_id is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return result = sql_tools.get_wechat_info_service_name(sql_session=sql_session, user_id=user_id) result_list = [] for _ in result: service_name = _ result_list.append({'service_name': service_name}) self.write({'status': {'msg': 'success', "RetCode": 200}, 'wechat_info': result_list}) class get_ad_wechat_wechat_name(BaseHandler): def get(self): user_id = self.get_argument('user_id', None) service_name = self.get_argument('service_name', None) sql_session = db.DBSession() if user_id is None or service_name is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return result = sql_tools.get_wechat_info_wechat_name(sql_session=sql_session, user_id=user_id, service_name=service_name) result_list = [] for _ in result: service_name, wechat_name = _ result_list.append({'service_name': service_name, 'wechat_name': wechat_name}) self.write({'status': {'msg': 'success', "RetCode": 200}, 'wechat_info': result_list}) class get_plan_action_record(BaseHandler): def get(self): user_id = self.get_argument('user_id', None) service_name = self.get_argument('service_name', None) wechat_name = self.get_argument('wechat_name', None) status = self.get_argument('status', None) plan_name = self.get_argument('plan_name', None) sql_session = db.DBSession() if user_id is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 落地页名字精确到毫秒,默认是全局唯一 result = sql_tools.get_plan_record(sql_session=sql_session, user_id=user_id, service_name=service_name, wechat_name=wechat_name, status=status, plan_name=plan_name) result_ = [] for i in range(len(result)): user_id, name, service_name, wechat_name, create_time, status, typesetting, wechat_id_info = result[i] _ = {} _['typesetting'] = json.loads(typesetting) _['ad_plan_name'] = name _['id'] = i _['create_time'] = create_time.strftime("%Y-%m-%d %H:%M:%S") _['service_name'] = service_name _['wechat_name'] = wechat_name _['wechat_id_info'] = wechat_id_info _['status'] = status result_.append(_) self.write({'status': {'msg': 'success', "RetCode": 200}, 'local_ad_plan_info': result_}) class get_all_ad_task(BaseHandler): def get(self): user_id = self.get_argument('user_id', None) sql_session = db.DBSession() if user_id is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 落地页名字精确到毫秒,默认是全局唯一 result = sql_tools.get_ad_task(sql_session=sql_session, user_id=user_id) task_dict = {} localtion = ['wechat', ''] for _ in result: task_name, status, task_status_num, create_time, typesetting = _ typesetting = json.loads(typesetting) if typesetting['plan_base'][1] == 'pyq': localtion[1] = 'pyq' create_time = create_time.strftime("%Y-%m-%d %H:%M:%S") if task_name not in task_dict.keys(): task_dict[task_name] = {} task_dict[task_name][status] = (task_status_num, create_time) result_ = [] num = 0 for k, v in task_dict.items(): # TODO:修改为dict的sort sum_num = 0 new_dict = {} create_time = None for k_, v_ in v.items(): task_status_num, create_time = v_ sum_num = sum_num + task_status_num new_dict[k_] = task_status_num status = 'todo' if 'todo' in new_dict.keys() else 'done' task_dict[k]['sum_num'] = sum_num new_dict['sum_num'] = sum_num result_.append( {'task_name': k, 'task_info': new_dict, 'create_time': create_time, 'channel': localtion[0], 'localtion': localtion[1], 'id': num, 'status': status}) num = num + 1 self.write({'status': {'msg': 'success', "RetCode": 200}, 'local_ad_plan_info': result_}) def heart_jump(): # TODO:tornado 心跳检测,下周做----线程不断检查,线程生命周期60分钟 pass def make_app(): return tornado.web.Application([ ("/get_all_ad_task", get_all_ad_task), # 获取所有任务状态, ("/create_ad_plan", create_ad_plan), # ("/get_ad_wechat_service_name", get_ad_wechat_service_name), ("/get_ad_wechat_wechat_name", get_ad_wechat_wechat_name), # ("/create_ad_plan_local", create_ad_plan_local), ("/create_ad_layout_local", create_ad_layout_local), ("/get_layout_local", get_ad_layout_local), ("/get_ad_plan_local", get_ad_plan_local), ("/delete_layout_local", delete_ad_layout), ("/delete_ad_plan_local", delete_ad_plan), ("/get_scan_status", get_scan_status), # ("/create_ad_layout_remote", create_ad_layout_remote), ("/ad_human_info", ad_human_info), ("/ad_wechat_info", ad_wechat_info), ("/get_plan_action_record", get_plan_action_record), ], debug=True, autoreload=True) if __name__ == "__main__": import logging logging.basicConfig( handlers=[ logging.handlers.RotatingFileHandler('./tornado.log', maxBytes=10 * 1024 * 1024, backupCount=5, encoding='utf-8') , logging.StreamHandler() # 供输出使用 ], level=logging.INFO, format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s" ) handler = logging.FileHandler('tornado.log') logger = logging.getLogger() logger.addHandler(handler) logger.setLevel(logging.INFO) app = make_app() app.listen(8888) tornado.ioloop.IOLoop.current().start()