from wechat_action.sql_models import DB from settings import using_config import tornado.log import tornado.ioloop import tornado.web import json import time from wechat_api.get_wechat_info import WechatApi from wechat_action.login_ad import LogIn from wechat_action import sql_tools import threading from web_module import user_action from sqlalchemy import Table import json import pickle # TODO:需要添加上supervisor,来维护进程 # TODO:有时间需要对tornado进行改进 # TODO:需要有一套上线工具,来维持线上稳定 db = DB(config=using_config) wechat_cookies_table = Table('wechat_cookies', db.metadata, autoload=True, autoload_with=db.engine) layout_typesetting_table = Table('layout_typesetting', db.metadata, autoload=True, autoload_with=db.engine) ad_plan_typesetting_table = Table('ad_plan_typesetting', db.metadata, autoload=True, autoload_with=db.engine) action_record_table = Table('action_record', db.metadata, autoload=True, autoload_with=db.engine) # 1.实现本机服务 # 2.实现线上docker-selenium服务 class BaseHandler(tornado.web.RequestHandler): def options(self): pass def set_default_headers(self): self.set_header('Access-Control-Allow-Origin', '*') self.set_header('Access-Control-Allow-Headers', '*') self.set_header('Access-Control-Max-Age', 1000) self.set_header('Content-type', '*') self.set_header('Access-Control-Allow-Methods', '*') class create_ad_plan_remote(BaseHandler): # 1.批量创建计划 # 返回创建计划是否已经开始 # TODO:落地页创建也在这个里面 def post(self): user_id = self.get_argument("user_id", None) ad_plan_name = self.get_argument("ad_plan_name", None) # wechat_json :[{'service_name':'one','wechat_name':''},{'service_name':'','wechat_name':''}] wechat_json = self.get_argument('wechat_json', None) log_ad, cookie_canuse = ad_human_info.refresh_wechat_cookies(self, user_id=user_id) threading.Thread(target=user_action.create_ad_plan, args=(user_id, ad_plan_name, wechat_json, log_ad, db, cookie_canuse)).start() class create_ad_plan_local(BaseHandler): def post(self): request_dict = json.loads(self.request.body, encoding='utf-8') user_id = request_dict['user_id'] ad_plan_list = request_dict['plan_list'] print(user_id, ad_plan_list) sql_session = db.DBSession() if user_id is None or ad_plan_list is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 落地页名字精确到毫秒,默认是全局唯一 # TODO:检查一下plan--内容 有无问题-----和前端确定一下 for _ in ad_plan_list: ad_plan_name = _['title'] ad_plan_typesetting_info = {'user_id': user_id, 'name': ad_plan_name, 'typesetting': json.dumps(_, ensure_ascii=False)} ad_plan_typesetting_inserte = sql_tools.save_ad_plan_typesetting_info( ad_plan_typesetting_info=ad_plan_typesetting_info, table_ad_plan_typesetting=ad_plan_typesetting_table) sql_session.execute(ad_plan_typesetting_inserte) sql_session.commit() self.write({'status': {'msg': 'success', "RetCode": 200}}) class create_ad_plan(BaseHandler): # TODO:只要tornado开着就不允许修改数据库,------想好之后上线如何操作 def post(self): request_dict = json.loads(self.request.body, encoding='utf-8') print(request_dict) user_id = request_dict['user_id'] # TODO:task_name设置全局唯一 ad_plan_list = request_dict['plan_list'] # 1.查看是否cookie可用 log_ad, cookie_canuse = ad_human_info.refresh_wechat_cookies(self, user_id=user_id) # 2.数据存入数据库 print(user_id, ad_plan_list) sql_session = db.DBSession() if user_id is None or ad_plan_list is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 2.1存计划数据 for _ in ad_plan_list: ad_plan_name = _['title'] ad_plan_typesetting_info = {'user_id': user_id, 'name': ad_plan_name, 'typesetting': json.dumps(_, ensure_ascii=False)} ad_plan_typesetting_inserte = sql_tools.save_ad_plan_typesetting_info( ad_plan_typesetting_info=ad_plan_typesetting_info, table_ad_plan_typesetting=ad_plan_typesetting_table) sql_session.execute(ad_plan_typesetting_inserte) # 2.2存行为历史记录 task_name = '{user_id}_{time_sign}'.format(user_id=user_id, time_sign=time.time()) for _ in ad_plan_list: # 1.查看历史中有无对应落地页 # TODO:落地页直接存放到action_record,运行过程中进行不同公众号进行不同操作 # 2.历史记录 print(_) for action_type in ['create_ad_plan', 'create_ad_layout']: object_name = _['title'] if action_type == 'create_ad_plan' else _['idea']['jump_type_page_type'] action_info = {'user_id': user_id, 'service_name': _['service_name'], 'wechat_name': _['wechat_name'], 'action_type': action_type, 'object_name': object_name, 'task_name': task_name, 'status': 'todo'} record_insert = sql_tools.save_action_record(action_record_info=action_info, table_action_record=action_record_table) sql_session.execute(record_insert) sql_session.commit() # 4.开始运行 # threading.Thread(target=user_action.create_ad_plan, # args=(user_id, ad_plan_name, wechat_json, log_ad, db, cookie_canuse)).start() class get_ad_plan_local(BaseHandler): def get(self): user_id = self.get_argument('user_id', None) layout_name = self.get_argument('plan_name', None) sql_session = db.DBSession() if user_id is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 落地页名字精确到毫秒,默认是全局唯一 if layout_name: result = sql_tools.get_plan_typesetting_rough(sql_session=sql_session, user_id=user_id, typesetting_name=layout_name) else: # TODO:之后修改一下,让其查询效率高点,like效率过低 layout_name = '' result = sql_tools.get_plan_typesetting_rough(sql_session=sql_session, user_id=user_id, typesetting_name=layout_name) print(result) result_ = [] for i in range(len(result)): print(result[i]) typesetting, name, create_time, update_time = result[i] _ = {} _['typesetting'] = json.loads(typesetting) _['ad_plan_name'] = name _['id'] = i _['create_time'] = create_time.strftime("%Y-%m-%d %H:%M:%S") _['update_time'] = update_time.strftime("%Y-%m-%d %H:%M:%S") result_.append(_) self.write({'statu': {'msg': 'success', "RetCode": 200}, 'local_ad_plan_info': result_}) class create_ad_layout_remote(BaseHandler): # 1.批量创建落地页 def post(self): user_id = self.get_argument("user_id", None) layout_name = self.get_argument("layout_name", None) # wechat_json :[{'service_name':'one','wechat_name':''},{'service_name':'','wechat_name':''}] wechat_json = self.get_argument('wechat_json', None) log_ad, cookie_canuse = ad_human_info.refresh_wechat_cookies(self, user_id=user_id) threading.Thread(target=user_action.create_layout, args=(user_id, layout_name, wechat_json, log_ad, db, cookie_canuse)).start() class create_ad_layout_local(BaseHandler): def post(self): # TODO:返回一个layout_name重复的一个信息 request_dict = json.loads(self.request.body) user_id = request_dict['user_id'] layout_typesetting = request_dict['layout_typesetting'] layout_name = request_dict['layout_name'] print(user_id, layout_typesetting, layout_name) print('layout-typesetting', type(layout_typesetting), layout_typesetting) sql_session = db.DBSession() if user_id is None or layout_name is None or layout_typesetting is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 落地页名字精确到毫秒,默认是全局唯一 layout_typesetting_info = {'user_id': user_id, 'name': layout_name, 'typesetting': layout_typesetting} layout_typesetting_inserte = sql_tools.save_layout_typesetting_info( layout_typesetting_info=layout_typesetting_info, table_layout_typesetting=layout_typesetting_table) sql_session.execute(layout_typesetting_inserte) sql_session.commit() self.write({'status': {'msg': 'success', "RetCode": 200}}) class get_ad_layout_local(BaseHandler): def get(self): user_id = self.get_argument('user_id', None) layout_name = self.get_argument('layout_name', None) sql_session = db.DBSession() if user_id is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 落地页名字精确到毫秒,默认是全局唯一 if layout_name: result = sql_tools.get_layout_typesetting_rough(sql_session=sql_session, user_id=user_id, typesetting_name=layout_name) else: # TODO:之后修改一下,让其查询效率高点,like效率过低 layout_name = '' result = sql_tools.get_layout_typesetting_rough(sql_session=sql_session, user_id=user_id, typesetting_name=layout_name) print(result) result_ = [] for i in range(len(result)): print(result[i]) typesetting, name, create_time, update_time = result[i] _ = {} _['typesetting'] = json.loads(typesetting) _['layout_name'] = name _['id'] = i _['create_time'] = create_time.strftime("%Y-%m-%d %H:%M:%S") _['update_time'] = update_time.strftime("%Y-%m-%d %H:%M:%S") result_.append(_) self.write({'statu': {'msg': 'success', "RetCode": 200}, 'local_layout_info': result_}) # TODO:wechat_info,human_info 这两张表有空时需要进行对应改进 class ad_status(BaseHandler): def get(self): user_id = self.get_argument("user_id", None) if user_id is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return sql_session = db.DBSession() lines = sql_tools.get_ad_status(sql_session=sql_session, user_id=user_id) result = [] for line in lines: action_type, wechat_name, service_name, update_time, create_time, status = line result.append( {'action_type': json.loads(action_type), 'wechat_name': wechat_name, 'service_name': service_name, 'update_time': update_time.strftime("%Y-%m-%d %H:%M:%S"), 'create_time': create_time.strftime("%Y-%m-%d %H:%M:%S"), 'status': status}) # result = json.loads(result) print(result) self.write({'status': {'msg': 'success', "RetCode": 200}, 'ad_status_info': result}) class ad_human_info(BaseHandler): # TODO:不允许短时间,刷新 @staticmethod def refresh_wechat_cookies(tornado_web, user_id): # TODO:添加互动接口,添加状态字段,打开selenium就变换 # 1.返回二维码链接 # ----1.查看cookie是否可用 sql_session = db.DBSession() cookie_db = sql_tools.get_wechat_cookies(sql_session, user_id=user_id) # 进行登录操作 log_ad = LogIn() # 使driver可以使用 cookie_canuse = False if cookie_db: cookie_db = pickle.loads(cookie_db) # TODO:log 日志需要进行对应配置 if not log_ad.wechat_cookies_check_alive(cookie_db): # cookie 不能使用 wechat_code = log_ad.log_in() tornado_web.write({'status': {'msg': 'success', "RetCode": 200}, 'wechat_code': wechat_code}) print('cookie can not use') else: # cookie 可以继续使用 cookie_canuse = True log_ad.driver.get('https://a.weixin.qq.com/index.html') tornado_web.write({'status': {'msg': 'success', "RetCode": 200}}) else: # cookie 不能使用 wechat_code = log_ad.log_in() tornado_web.write({'status': {'msg': 'success', "RetCode": 200}, 'wechat_code': wechat_code}) return log_ad, cookie_canuse # 1.人群包获取 def get(self): # TODO:添加分页 # 0.是否刷新 # 1.获取userid,以及是否刷新 user_id = self.get_argument("user_id", None) human_package_name = self.get_argument('human_package_name', None) is_refresh = self.get_argument("is_refresh", None) wechat_name = self.get_argument('wechat_name', None) service_name = self.get_argument('service_name', None) print(user_id, is_refresh) if user_id is None or is_refresh is None or wechat_name is None or service_name is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return sql_session = db.DBSession() # TODO:一个涉及到selenium-driver的请求-生命周期.----看一下tornado是怎么处理请求的生命周期 if int(is_refresh) == 1: log_ad, cookie_canuse = self.refresh_wechat_cookies(self, user_id=user_id) threading.Thread(target=user_action.get_human_info, args=( user_id, log_ad, db, cookie_canuse)).start() else: # 1.查看是否在刷新, # 在刷新中, # 返回正在刷新 # -------不管上面逻辑让他们多刷新几次 # 不在刷新 # 返回对应数据 # 2.获取userid对应数据 result = sql_tools.get_human_info(sql_session=sql_session, service_name=service_name, wechat_name=wechat_name) print(result) result = json.loads(result) if human_package_name: result = [_ for _ in result if human_package_name in _['name']] result_ = [] for i in range(len(result)): _ = result[i] _['id'] = i result_.append(_) self.write({'status': {'msg': 'success', "RetCode": 200}, 'human_info': result}) class ad_wechat_info(BaseHandler): # 1.公众号相关信息获取 def get(self): # TODO:添加分页, # 公众号,服务商,唯一id设计或者获取 # 0.是否刷新 # 1.获取userid,以及是否刷新 user_id = self.get_argument("user_id", None) is_refresh = self.get_argument("is_refresh", None) print(user_id, is_refresh) if user_id is None or is_refresh is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return sql_session = db.DBSession() # TODO:一个涉及到selenium-driver的请求-生命周期.----看一下tornado是怎么处理请求的生命周期 if int(is_refresh) == 1: log_ad, cookie_canuse = ad_human_info.refresh_wechat_cookies(self, user_id=user_id) threading.Thread(target=user_action.get_human_info, args=( user_id, log_ad, db, cookie_canuse)).start() else: # 1.查看是否在刷新, # 在刷新中, # 返回正在刷新 # -------不管上面逻辑让他们多刷新几次 # 不在刷新 # 返回对应数据 # 2.获取userid对应数据 result = sql_tools.get_wechat_info(sql_session=sql_session, user_id=user_id) result_list = [] for _ in result: service_name, wechat_name = _ result_list.append({'service_name': service_name, 'wechat_name': wechat_name}) print(result_list) self.write({'status': {'msg': 'success', "RetCode": 200}, 'wechat_info': result_list}) class delete_ad_layout(BaseHandler): def get(self): user_id = self.get_argument('user_id', None) layout_name = self.get_argument('layout_name', None) sql_session = db.DBSession() if user_id is None or layout_name is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 落地页名字精确到毫秒,默认是全局唯一 sql_tools.delete_layout_typesetting_vir(sql_session=sql_session, user_id=user_id, typesetting_name=layout_name) self.write({'status': {'msg': 'success', "RetCode": 200}}) class delete_ad_plan(BaseHandler): def get(self): user_id = self.get_argument('user_id', None) plan_name = self.get_argument('plan_name', None) service_name = self.get_argument('service_name', None) wechat_name = self.get_argument('wechat_name', None) sql_session = db.DBSession() if user_id is None or plan_name is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 落地页名字精确到毫秒,默认是全局唯一 sql_tools.delete_ad_plan_typesetting_vir(sql_session=sql_session, user_id=user_id, typesetting_name=plan_name,wechat_name=wechat_name, service_name=service_name) self.write({'status': {'msg': 'success', "RetCode": 200}}) class get_ad_wechat_service_name(BaseHandler): def get(self): user_id = self.get_argument('user_id', None) sql_session = db.DBSession() if user_id is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return result = sql_tools.get_wechat_info_service_name(sql_session=sql_session, user_id=user_id) result_list = [] for _ in result: service_name = _ result_list.append({'service_name': service_name}) print(result_list) self.write({'status': {'msg': 'success', "RetCode": 200}, 'wechat_info': result_list}) class get_ad_wechat_wechat_name(BaseHandler): def get(self): user_id = self.get_argument('user_id', None) service_name = self.get_argument('service_name', None) sql_session = db.DBSession() if user_id is None or service_name is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return result = sql_tools.get_wechat_info_wechat_name(sql_session=sql_session, user_id=user_id, service_name=service_name) result_list = [] for _ in result: service_name, wechat_name = _ result_list.append({'service_name': service_name, 'wechat_name': wechat_name}) print(result_list) self.write({'status': {'msg': 'success', "RetCode": 200}, 'wechat_info': result_list}) class get_plan_action_record(BaseHandler): def get(self): user_id = self.get_argument('user_id', None) service_name = self.get_argument('service_name', None) wechat_name = self.get_argument('wechat_name', None) status = self.get_argument('status', None) plan_name = self.get_argument('plan_name', None) sql_session = db.DBSession() if user_id is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 落地页名字精确到毫秒,默认是全局唯一 result = sql_tools.get_plan_record(sql_session=sql_session, user_id=user_id, service_name=service_name, wechat_name=wechat_name, status=status, plan_name=plan_name) result_ = [] for i in range(len(result)): print(result[i]) user_id, name, service_name, wechat_name, create_time, status, typesetting, wechat_id_info = result[i] _ = {} _['typesetting'] = json.loads(typesetting) _['ad_plan_name'] = name _['id'] = i _['create_time'] = create_time.strftime("%Y-%m-%d %H:%M:%S") _['service_name'] = service_name _['wechat_name'] = wechat_name _['wechat_id_info'] = wechat_id_info _['status'] = status result_.append(_) self.write({'statu': {'msg': 'success', "RetCode": 200}, 'local_ad_plan_info': result_}) class get_task_list(BaseHandler): def get(self): user_id = self.get_argument('user_id', None) sql_session = db.DBSession() if user_id is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 落地页名字精确到毫秒,默认是全局唯一 result = sql_tools.get_ad_task(sql_session=sql_session, user_id=user_id) result_ = [] for i in range(len(result)): print(result[i]) user_id, name, service_name, wechat_name, create_time, status, typesetting, wechat_id_info = result[i] _ = {} _['typesetting'] = json.loads(typesetting) _['ad_plan_name'] = name _['id'] = i _['create_time'] = create_time.strftime("%Y-%m-%d %H:%M:%S") _['service_name'] = service_name _['wechat_name'] = wechat_name _['wechat_id_info'] = wechat_id_info _['status'] = status result_.append(_) self.write({'statu': {'msg': 'success', "RetCode": 200}, 'local_ad_plan_info': result_}) class get_all_ad_task(BaseHandler): #TODO:添加两个字段,微信,朋友圈 def get(self): user_id = self.get_argument('user_id', None) sql_session = db.DBSession() if user_id is None: self.write({'status': {'msg': 'url parameter error', "RetCode": 400}}) return # 落地页名字精确到毫秒,默认是全局唯一 result = sql_tools.get_ad_task(sql_session=sql_session, user_id=user_id) task_dict = {} for _ in result: task_name, status, task_status_num, create_time = _ create_time = create_time.strftime("%Y-%m-%d %H:%M:%S") if task_name not in task_dict.keys(): task_dict[task_name] = {} task_dict[task_name][status] = (task_status_num, create_time) result_ = [] num = 0 for k, v in task_dict.items(): # TODO:修改为dict的sort sum_num = 0 print(k, v) new_dict = {} create_time = None for k_, v_ in v.items(): task_status_num, create_time = v_ sum_num = sum_num + task_status_num new_dict[k_] = task_status_num task_dict[k]['sum_num'] = sum_num new_dict['sum_num'] = sum_num result_.append({'task_name': k, 'task_info': new_dict, 'create_time': create_time, 'id': num}) num = num + 1 print(json.dumps(task_dict)) self.write({'statu': {'msg': 'success', "RetCode": 200}, 'local_ad_plan_info': result_}) def make_app(): return tornado.web.Application([ ("/get_all_ad_task", get_all_ad_task), ("/create_ad_plan", create_ad_plan), ("/get_ad_wechat_service_name", get_ad_wechat_service_name), ("/get_ad_wechat_wechat_name", get_ad_wechat_wechat_name), # ("/create_ad_plan_local", create_ad_plan_local), # ("/create_ad_layout_local", create_ad_layout_local), ("/get_layout_local", get_ad_layout_local), ("/get_ad_plan_local", get_ad_plan_local), ("/delete_layout_local", delete_ad_layout), ("/delete_ad_plan_local", delete_ad_plan), # ("/create_ad_plan_remote", create_ad_plan_remote), # ("/create_ad_layout_remote", create_ad_layout_remote), ("/ad_human_info", ad_human_info), ("/ad_wechat_info", ad_wechat_info), # ("/ad_status", ad_status), ("/get_plan_action_record", get_plan_action_record), ("/get_task_list", get_task_list) ], debug=True, autoreload=True) if __name__ == "__main__": tornado.log.LogFormatter() app = make_app() app.listen(8888) tornado.ioloop.IOLoop.current().start()