from wechat_action.sql_models import DB from settings import using_config import tornado.log import tornado.ioloop import tornado.web from logging import handlers from wechat_action.login_ad import LogIn from wechat_action import sql_tools import threading from web_module import user_action from sqlalchemy import Table import json import pickle # TODO:需要添加上supervisor,来维护进程 # TODO:有时间需要对tornado进行改进 # TODO:需要有一套上线工具,来维持线上稳定 db = DB(config=using_config) wechat_cookies_table = Table('wechat_cookies', db.metadata, autoload=True, autoload_with=db.engine) layout_typesetting_table = Table('layout_typesetting', db.metadata, autoload=True, autoload_with=db.engine) ad_plan_typesetting_table = Table('ad_plan_typesetting', db.metadata, autoload=True, autoload_with=db.engine) action_record_table = Table('action_record', db.metadata, autoload=True, autoload_with=db.engine) # 1.实现本机服务 # 2.实现线上docker-selenium服务 class BaseHandler(tornado.web.RequestHandler): def options(self): pass def set_default_headers(self): self.set_header('Access-Control-Allow-Origin', '*') self.set_header('Access-Control-Allow-Headers', '*') self.set_header('Access-Control-Max-Age', 1000) self.set_header('Content-type', '*') self.set_header('Access-Control-Allow-Methods', '*') class create_ad_plan_local(BaseHandler): def post(self): request_dict = json.loads(self.request.body, encoding='utf-8') user_id = request_dict['user_id'] ad_plan_list = request_dict['plan_list'] print(user_id, ad_plan_list) sql_session = db.DBSession() if user_id is None or ad_plan_list is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 落地页名字精确到毫秒,默认是全局唯一 for _ in ad_plan_list: ad_plan_name = _['title'] ad_plan_typesetting_info = {'user_id': user_id, 'name': ad_plan_name, 'typesetting': json.dumps(_, ensure_ascii=False)} ad_plan_typesetting_inserte = sql_tools.save_ad_plan_typesetting_info( ad_plan_typesetting_info=ad_plan_typesetting_info, table_ad_plan_typesetting=ad_plan_typesetting_table) sql_session.execute(ad_plan_typesetting_inserte) sql_session.commit() self.write({'status': {'msg': 'success', "RetCode": 200}}) class create_ad_plan(BaseHandler): # TODO:只要tornado开着就不允许修改数据库,------想好之后上线如何操作 # TODO:需要与刷新用户cookie相关action 联动,---------正在刷新用户信息时,不能进行计划创建.反之同理 def post(self): request_dict = json.loads(self.request.body, encoding='utf-8') print(request_dict) user_id = request_dict['user_id'] ad_plan_list = request_dict['plan_list'] # 1.查看是否cookie可用 log_ad, cookie_canuse = ad_human_info.refresh_wechat_cookies(self, user_id=user_id) # 2.数据存入数据库 print(user_id, ad_plan_list) sql_session = db.DBSession() if user_id is None or ad_plan_list is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 2.1存计划数据 for _ in ad_plan_list: ad_plan_name = _['title'].replace(' ', '')[:29] ad_plan_typesetting_info = {'user_id': user_id, 'name': ad_plan_name, 'typesetting': json.dumps(_, ensure_ascii=False)} print('typesetting_info') print(_) print(ad_plan_typesetting_info) ad_plan_typesetting_inserte = sql_tools.save_ad_plan_typesetting_info( ad_plan_typesetting_info=ad_plan_typesetting_info, table_ad_plan_typesetting=ad_plan_typesetting_table) sql_session.execute(ad_plan_typesetting_inserte) # 4.开始运行 threading.Thread(target=user_action.carry_plan, args=(user_id, ad_plan_list, log_ad, db, cookie_canuse)).start() class get_ad_plan_local(BaseHandler): def get(self): user_id = self.get_argument('user_id', None) layout_name = self.get_argument('plan_name', None) sql_session = db.DBSession() if user_id is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 落地页名字精确到毫秒,默认是全局唯一 if layout_name: result = sql_tools.get_plan_typesetting_rough(sql_session=sql_session, user_id=user_id, typesetting_name=layout_name) else: # TODO:之后修改一下,让其查询效率高点,like效率过低 layout_name = '' result = sql_tools.get_plan_typesetting_rough(sql_session=sql_session, user_id=user_id, typesetting_name=layout_name) print(result) result_ = [] for i in range(len(result)): print(result[i]) typesetting, name, create_time, update_time = result[i] _ = {} _['typesetting'] = json.loads(typesetting) _['ad_plan_name'] = name _['id'] = i _['create_time'] = create_time.strftime("%Y-%m-%d %H:%M:%S") _['update_time'] = update_time.strftime("%Y-%m-%d %H:%M:%S") result_.append(_) self.write({'status': {'msg': 'success', "RetCode": 200}, 'local_ad_plan_info': result_}) class create_ad_layout_local(BaseHandler): def post(self): # TODO:返回一个layout_name重复的一个信息 request_dict = json.loads(self.request.body) user_id = request_dict['user_id'] layout_typesetting = request_dict['layout_typesetting'] layout_name = request_dict['layout_name'].replace(' ', '')[:29] print(user_id, layout_typesetting, layout_name) print('layout-typesetting', type(layout_typesetting), layout_typesetting) sql_session = db.DBSession() if user_id is None or layout_name is None or layout_typesetting is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 落地页名字精确到毫秒,默认是全局唯一 layout_typesetting_info = {'user_id': user_id, 'name': layout_name, 'typesetting': layout_typesetting} layout_typesetting_inserte = sql_tools.save_layout_typesetting_info( layout_typesetting_info=layout_typesetting_info, table_layout_typesetting=layout_typesetting_table) sql_session.execute(layout_typesetting_inserte) sql_session.commit() self.write({'status': {'msg': 'success', "RetCode": 200}}) class get_ad_layout_local(BaseHandler): def get(self): user_id = self.get_argument('user_id', None) layout_name = self.get_argument('layout_name', None) sql_session = db.DBSession() if user_id is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 落地页名字精确到毫秒,默认是全局唯一 if layout_name: result = sql_tools.get_layout_typesetting_rough(sql_session=sql_session, user_id=user_id, typesetting_name=layout_name) else: # TODO:之后修改一下,让其查询效率高点,like效率过低 layout_name = '' result = sql_tools.get_layout_typesetting_rough(sql_session=sql_session, user_id=user_id, typesetting_name=layout_name) print(result) result_ = [] for i in range(len(result)): print(result[i]) typesetting, name, create_time, update_time = result[i] _ = {} _['typesetting'] = json.loads(typesetting) _['layout_name'] = name _['id'] = i _['create_time'] = create_time.strftime("%Y-%m-%d %H:%M:%S") _['update_time'] = update_time.strftime("%Y-%m-%d %H:%M:%S") result_.append(_) self.write({'status': {'msg': 'success', "RetCode": 200}, 'local_layout_info': result_}) # TODO:wechat_info,human_info 这两张表有空时需要进行对应改进 # TODO:ad_human_info ,ad_wecaht_info 两个的行为需要与create_ad_plan 进行交互 class ad_human_info(BaseHandler): # TODO:设置一下update---table,如果失败了sql_session需要关闭 @staticmethod def refresh_wechat_cookies(tornado_web, user_id): # TODO:添加互动接口,添加状态字段,打开selenium就变换 # 1.返回二维码链接 # ----1.查看cookie是否可用 sql_session = db.DBSession() cookie_db = sql_tools.get_wechat_cookies(sql_session, user_id=user_id) # 进行登录操作 log_ad = LogIn(user_id=user_id) # 使driver可以使用 cookie_canuse = False if cookie_db: cookie_db = pickle.loads(cookie_db) if not log_ad.wechat_cookies_check_alive(cookie_db): # cookie 不能使用 wechat_code = log_ad.log_in() tornado_web.write({'status': {'msg': 'success', "RetCode": 200}, 'wechat_code': wechat_code}) print('cookie can not use') else: # cookie 可以继续使用 cookie_canuse = True log_ad.driver.get('https://a.weixin.qq.com/index.html') tornado_web.write({'status': {'msg': 'success', "RetCode": 200}}) else: # cookie 不能使用 wechat_code = log_ad.log_in() tornado_web.write({'status': {'msg': 'success', "RetCode": 200}, 'wechat_code': wechat_code}) return log_ad, cookie_canuse # 1.人群包获取 def get(self): sql_session = db.DBSession() try: # TODO:添加分页 # 0.是否刷新 # 1.获取userid,以及是否刷新 user_id = self.get_argument("user_id", None) human_package_name = self.get_argument('human_package_name', None) is_refresh = self.get_argument("is_refresh", None) wechat_name = self.get_argument('wechat_name', None) service_name = self.get_argument('service_name', None) print(user_id, is_refresh) if user_id is None or is_refresh is None or wechat_name is None or service_name is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # TODO:一个涉及到selenium-driver的请求-生命周期.----看一下tornado是怎么处理请求的生命周期 if int(is_refresh) == 1: log_ad, cookie_canuse = self.refresh_wechat_cookies(self, user_id=user_id) threading.Thread(target=user_action.get_human_info, args=( user_id, log_ad, db, cookie_canuse)).start() else: # 1.查看是否在刷新, # 在刷新中, # 返回正在刷新 # -------不管上面逻辑让他们多刷新几次 # 不在刷新 # 返回对应数据 # 2.获取userid对应数据 result = sql_tools.get_human_info(sql_session=sql_session, service_name=service_name, wechat_name=wechat_name) print(result) result = json.loads(result) if human_package_name: result = [_ for _ in result if human_package_name in _['name']] result_ = [] for i in range(len(result)): _ = result[i] _['id'] = i result_.append(_) self.write({'status': {'msg': 'success', "RetCode": 200}, 'human_info': result}) except Exception as e: logging.error(str(e)) try: sql_session.commit() except: pass class ad_wechat_info(BaseHandler): # 1.公众号相关信息获取 def get(self): # TODO:添加分页, # 公众号,服务商,唯一id设计或者获取 # 0.是否刷新 # 1.获取userid,以及是否刷新 user_id = self.get_argument("user_id", None) is_refresh = self.get_argument("is_refresh", None) print(user_id, is_refresh) if user_id is None or is_refresh is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return sql_session = db.DBSession() # TODO:一个涉及到selenium-driver的请求-生命周期.----看一下tornado是怎么处理请求的生命周期 if int(is_refresh) == 1: log_ad, cookie_canuse = ad_human_info.refresh_wechat_cookies(self, user_id=user_id) threading.Thread(target=user_action.get_human_info, args=( user_id, log_ad, db, cookie_canuse)).start() else: # 1.查看是否在刷新, # 在刷新中, # 返回正在刷新 # -------不管上面逻辑让他们多刷新几次 # 不在刷新 # 返回对应数据 # 2.获取userid对应数据 result = sql_tools.get_wechat_info(sql_session=sql_session, user_id=user_id) result_list = [] for _ in result: service_name, wechat_name = _ result_list.append({'service_name': service_name, 'wechat_name': wechat_name}) print(result_list) self.write({'status': {'msg': 'success', "RetCode": 200}, 'wechat_info': result_list}) class delete_ad_layout(BaseHandler): def get(self): user_id = self.get_argument('user_id', None) layout_name = self.get_argument('layout_name', None) sql_session = db.DBSession() if user_id is None or layout_name is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 落地页名字精确到毫秒,默认是全局唯一 sql_tools.delete_layout_typesetting_vir(sql_session=sql_session, user_id=user_id, typesetting_name=layout_name) self.write({'status': {'msg': 'success', "RetCode": 200}}) class delete_ad_plan(BaseHandler): def get(self): user_id = self.get_argument('user_id', None) plan_name = self.get_argument('plan_name', None) service_name = self.get_argument('service_name', None) wechat_name = self.get_argument('wechat_name', None) sql_session = db.DBSession() if user_id is None or plan_name is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 落地页名字精确到毫秒,默认是全局唯一 sql_tools.delete_ad_plan_typesetting_vir(sql_session=sql_session, user_id=user_id, typesetting_name=plan_name, wechat_name=wechat_name, service_name=service_name) self.write({'status': {'msg': 'success', "RetCode": 200}}) class get_ad_wechat_service_name(BaseHandler): def get(self): user_id = self.get_argument('user_id', None) sql_session = db.DBSession() if user_id is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return result = sql_tools.get_wechat_info_service_name(sql_session=sql_session, user_id=user_id) result_list = [] for _ in result: service_name = _ result_list.append({'service_name': service_name}) print(result_list) self.write({'status': {'msg': 'success', "RetCode": 200}, 'wechat_info': result_list}) class get_ad_wechat_wechat_name(BaseHandler): def get(self): user_id = self.get_argument('user_id', None) service_name = self.get_argument('service_name', None) sql_session = db.DBSession() if user_id is None or service_name is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return result = sql_tools.get_wechat_info_wechat_name(sql_session=sql_session, user_id=user_id, service_name=service_name) result_list = [] for _ in result: service_name, wechat_name = _ result_list.append({'service_name': service_name, 'wechat_name': wechat_name}) print(result_list) self.write({'status': {'msg': 'success', "RetCode": 200}, 'wechat_info': result_list}) class get_plan_action_record(BaseHandler): def get(self): user_id = self.get_argument('user_id', None) service_name = self.get_argument('service_name', None) wechat_name = self.get_argument('wechat_name', None) status = self.get_argument('status', None) plan_name = self.get_argument('plan_name', None) sql_session = db.DBSession() if user_id is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 落地页名字精确到毫秒,默认是全局唯一 result = sql_tools.get_plan_record(sql_session=sql_session, user_id=user_id, service_name=service_name, wechat_name=wechat_name, status=status, plan_name=plan_name) result_ = [] for i in range(len(result)): print(result[i]) user_id, name, service_name, wechat_name, create_time, status, typesetting, wechat_id_info = result[i] _ = {} _['typesetting'] = json.loads(typesetting) _['ad_plan_name'] = name _['id'] = i _['create_time'] = create_time.strftime("%Y-%m-%d %H:%M:%S") _['service_name'] = service_name _['wechat_name'] = wechat_name _['wechat_id_info'] = wechat_id_info _['status'] = status result_.append(_) self.write({'status': {'msg': 'success', "RetCode": 200}, 'local_ad_plan_info': result_}) class get_all_ad_task(BaseHandler): def get(self): user_id = self.get_argument('user_id', None) sql_session = db.DBSession() if user_id is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 落地页名字精确到毫秒,默认是全局唯一 result = sql_tools.get_ad_task(sql_session=sql_session, user_id=user_id) task_dict = {} localtion = ['wechat', ''] for _ in result: task_name, status, task_status_num, create_time, typesetting = _ print(typesetting) typesetting = json.loads(typesetting) if typesetting['plan_base'][1] == 'pyq': localtion[1] = 'pyq' create_time = create_time.strftime("%Y-%m-%d %H:%M:%S") if task_name not in task_dict.keys(): task_dict[task_name] = {} task_dict[task_name][status] = (task_status_num, create_time) result_ = [] num = 0 for k, v in task_dict.items(): # TODO:修改为dict的sort sum_num = 0 print(k, v) new_dict = {} create_time = None for k_, v_ in v.items(): task_status_num, create_time = v_ sum_num = sum_num + task_status_num new_dict[k_] = task_status_num status = 'todo' if 'todo' in new_dict.keys() else 'done' task_dict[k]['sum_num'] = sum_num new_dict['sum_num'] = sum_num result_.append( {'task_name': k, 'task_info': new_dict, 'create_time': create_time, 'channel': localtion[0], 'localtion': localtion[1], 'id': num, 'status': status}) num = num + 1 print(json.dumps(task_dict)) self.write({'status': {'msg': 'success', "RetCode": 200}, 'local_ad_plan_info': result_}) def heart_jump(): # TODO:tornado 心跳检测,下周做----线程不断检查,线程生命周期60分钟 pass def make_app(): return tornado.web.Application([ ("/get_all_ad_task", get_all_ad_task), # 获取所有任务状态, ("/create_ad_plan", create_ad_plan), # ("/get_ad_wechat_service_name", get_ad_wechat_service_name), ("/get_ad_wechat_wechat_name", get_ad_wechat_wechat_name), # ("/create_ad_plan_local", create_ad_plan_local), ("/create_ad_layout_local", create_ad_layout_local), ("/get_layout_local", get_ad_layout_local), ("/get_ad_plan_local", get_ad_plan_local), ("/delete_layout_local", delete_ad_layout), ("/delete_ad_plan_local", delete_ad_plan), # ("/create_ad_layout_remote", create_ad_layout_remote), ("/ad_human_info", ad_human_info), ("/ad_wechat_info", ad_wechat_info), ("/get_plan_action_record", get_plan_action_record), ], debug=True, autoreload=True) if __name__ == "__main__": import logging logging.basicConfig( handlers=[ logging.handlers.RotatingFileHandler('./tornado.log', maxBytes=10 * 1024 * 1024, backupCount=5, encoding='utf-8') , logging.StreamHandler() # 供输出使用 ], level=logging.INFO, format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s" ) handler = logging.FileHandler('tornado.log') logger = logging.getLogger() logger.addHandler(handler) logger.setLevel(logging.INFO) app = make_app() app.listen(8888) tornado.ioloop.IOLoop.current().start()