|
- from wechat_action.sql_models import DB
- from settings import using_config
- import tornado.log
- import tornado.ioloop
- import tornado.web
- from logging import handlers
- from wechat_action.login_ad import LogIn
- from wechat_action import sql_tools
- import threading
- from web_module import user_action
- from sqlalchemy import Table
- import json
- import pickle
- from datetime import datetime
- # TODO:需要添加上supervisor,来维护进程
- # TODO:有时间需要对tornado进行改进
- # TODO:需要有一套上线工具,来维持线上稳定
- db = DB(config=using_config)
- wechat_cookies_table = Table('wechat_cookies', db.metadata,
- autoload=True, autoload_with=db.engine)
- layout_typesetting_table = Table('layout_typesetting', db.metadata,
- autoload=True, autoload_with=db.engine)
- ad_plan_typesetting_table = Table('ad_plan_typesetting', db.metadata,
- autoload=True, autoload_with=db.engine)
- action_record_table = Table('action_record', db.metadata,
- autoload=True, autoload_with=db.engine)
- layout_create_action = 'create_ad_layout'
- ad_plan_create_action = 'create_ad_plan'
- refresh_wechat_action = 'refresh_wechat_info'
- # 1.实现本机服务
- # 2.实现线上docker-selenium服务
- class BaseHandler(tornado.web.RequestHandler):
- def options(self):
- pass
- def set_default_headers(self):
- self.set_header('Access-Control-Allow-Origin', '*')
- self.set_header('Access-Control-Allow-Headers', '*')
- self.set_header('Access-Control-Max-Age', 1000)
- self.set_header('Content-type', '*')
- self.set_header('Access-Control-Allow-Methods', '*')
- class create_ad_plan_local(BaseHandler):
- def post(self):
- request_dict = json.loads(self.request.body, encoding='utf-8')
- user_id = request_dict['user_id']
- ad_plan_list = request_dict['plan_list']
- print(user_id, ad_plan_list)
- sql_session = db.DBSession()
- if user_id is None or ad_plan_list is None:
- self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
- return
- # 落地页名字精确到毫秒,默认是全局唯一
- for _ in ad_plan_list:
- ad_plan_name = _['title']
- ad_plan_typesetting_info = {'user_id': user_id, 'name': ad_plan_name,
- 'typesetting': json.dumps(_, ensure_ascii=False)}
- ad_plan_typesetting_inserte = sql_tools.save_ad_plan_typesetting_info(
- ad_plan_typesetting_info=ad_plan_typesetting_info,
- table_ad_plan_typesetting=ad_plan_typesetting_table)
- sql_session.execute(ad_plan_typesetting_inserte)
- sql_session.commit()
- self.write({'status': {'msg': 'success', "RetCode": 200}})
- class create_ad_plan(BaseHandler):
- # TODO:只要tornado开着就不允许修改数据库,------想好之后上线如何操作
- # TODO:名字检查----只保留三种符号(.-_),中文字符长度一,数字字符长度二
- @staticmethod
- def check_task(user_id):
- # TODO:检查是否同user_id的任务在跑中,有的话只保存任务.不做其他事情
- sql_session = db.DBSession()
- result = sql_tools.get_task_in_hand_num(user_id, sql_session)
- return result
- def save_task_info(self, user_id, ad_plan_list, sql_session, task_name):
- # 2.数据存入数据库
- print(user_id, ad_plan_list)
- if user_id is None or ad_plan_list is None:
- self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
- return
- # 2.1存计划数据
- for _ in ad_plan_list:
- ad_plan_name = _['title']
- ad_plan_typesetting_info = {'user_id': user_id, 'name': ad_plan_name,
- 'typesetting': json.dumps(_, ensure_ascii=False)}
- print('typesetting_info')
- print(_)
- print(ad_plan_typesetting_info)
- ad_plan_typesetting_inserte = sql_tools.save_ad_plan_typesetting_info(
- ad_plan_typesetting_info=ad_plan_typesetting_info,
- table_ad_plan_typesetting=ad_plan_typesetting_table)
- sql_session.execute(ad_plan_typesetting_inserte)
- sql_session.commit()
- for _ in ad_plan_list:
- print(_)
- for action_type in [layout_create_action, ad_plan_create_action]:
- object_name = _['title'] if action_type == ad_plan_create_action else \
- _['idea']['jump_type_page_type'][
- 'layout_name']
- action_info = {'user_id': user_id, 'service_name': _['service_name'],
- 'wechat_name': _['wechat_name'],
- 'action_type': action_type, 'object_name': object_name, 'task_name': task_name,
- 'status': 'todo'}
- record_insert = sql_tools.save_action_record(action_record_info=action_info,
- table_action_record=action_record_table)
- sql_session.execute(record_insert)
- sql_session.commit()
- def post(self):
- try:
- sql_session = db.DBSession()
- request_dict = json.loads(self.request.body, encoding='utf-8')
- print(request_dict)
- ad_plan_list = request_dict['plan_list']
- user_id = request_dict['user_id']
- # 2.2存行为记录
- task_name = 'user_id: {user_id} time:{time_sign} action:create_plan'.format(user_id=user_id,
- time_sign=datetime.now().strftime(
- "%Y-%m-%d, %H:%M:%S"))
- # 4.开始运行
- if not self.check_task(user_id=user_id):
- # 1.查看是否cookie可用
- log_ad, cookie_canuse = ad_human_info.refresh_wechat_cookies(self, user_id=user_id)
- self.save_task_info(user_id, ad_plan_list, sql_session, task_name)
- threading.Thread(target=user_action.carry_plan,
- args=(user_id, ad_plan_list, log_ad, db, cookie_canuse, task_name)).start()
- else:
- self.save_task_info(user_id, ad_plan_list, sql_session, task_name)
- except Exception as e:
- raise
- logging.error(str(e))
- self.write('eror')
- try:
- sql_session.commit()
- except:
- pass
- class get_ad_plan_local(BaseHandler):
- def get(self):
- user_id = self.get_argument('user_id', None)
- layout_name = self.get_argument('plan_name', None)
- sql_session = db.DBSession()
- if user_id is None:
- self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
- return
- # 落地页名字精确到毫秒,默认是全局唯一
- if layout_name:
- result = sql_tools.get_plan_typesetting_rough(sql_session=sql_session, user_id=user_id,
- typesetting_name=layout_name)
- else:
- # TODO:之后修改一下,让其查询效率高点,like效率过低
- layout_name = ''
- result = sql_tools.get_plan_typesetting_rough(sql_session=sql_session, user_id=user_id,
- typesetting_name=layout_name)
- print(result)
- result_ = []
- for i in range(len(result)):
- print(result[i])
- typesetting, name, create_time, update_time = result[i]
- _ = {}
- _['typesetting'] = json.loads(typesetting)
- _['ad_plan_name'] = name
- _['id'] = i
- _['create_time'] = create_time.strftime("%Y-%m-%d %H:%M:%S")
- _['update_time'] = update_time.strftime("%Y-%m-%d %H:%M:%S")
- result_.append(_)
- self.write({'status': {'msg': 'success', "RetCode": 200},
- 'local_ad_plan_info': result_})
- class create_ad_layout_local(BaseHandler):
- def post(self):
- # TODO:返回一个layout_name重复的一个信息
- request_dict = json.loads(self.request.body)
- user_id = request_dict['user_id']
- layout_typesetting = request_dict['layout_typesetting']
- layout_name = request_dict['layout_name']
- print(user_id, layout_typesetting, layout_name)
- print('layout-typesetting', type(layout_typesetting), layout_typesetting)
- sql_session = db.DBSession()
- if user_id is None or layout_name is None or layout_typesetting is None:
- self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
- return
- # 落地页名字精确到毫秒,默认是全局唯一
- layout_typesetting_info = {'user_id': user_id, 'name': layout_name,
- 'typesetting': layout_typesetting}
- layout_typesetting_inserte = sql_tools.save_layout_typesetting_info(
- layout_typesetting_info=layout_typesetting_info,
- table_layout_typesetting=layout_typesetting_table)
- sql_session.execute(layout_typesetting_inserte)
- sql_session.commit()
- self.write({'status': {'msg': 'success', "RetCode": 200}})
- class get_ad_layout_local(BaseHandler):
- def get(self):
- user_id = self.get_argument('user_id', None)
- layout_name = self.get_argument('layout_name', None)
- sql_session = db.DBSession()
- if user_id is None:
- self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
- return
- # 落地页名字精确到毫秒,默认是全局唯一
- if layout_name:
- result = sql_tools.get_layout_typesetting_rough(sql_session=sql_session, user_id=user_id,
- typesetting_name=layout_name)
- else:
- # TODO:之后修改一下,让其查询效率高点,like效率过低
- layout_name = ''
- result = sql_tools.get_layout_typesetting_rough(sql_session=sql_session, user_id=user_id,
- typesetting_name=layout_name)
- print(result)
- result_ = []
- for i in range(len(result)):
- print(result[i])
- typesetting, name, create_time, update_time = result[i]
- _ = {}
- _['typesetting'] = json.loads(typesetting)
- _['layout_name'] = name
- _['id'] = i
- _['create_time'] = create_time.strftime("%Y-%m-%d %H:%M:%S")
- _['update_time'] = update_time.strftime("%Y-%m-%d %H:%M:%S")
- result_.append(_)
- self.write({'status': {'msg': 'success', "RetCode": 200},
- 'local_layout_info': result_})
- # TODO:wechat_info,human_info 这两张表有空时需要进行对应改进
- # TODO:ad_human_info ,ad_wecaht_info 两个的行为需要与create_ad_plan 进行交互
- class ad_human_info(BaseHandler):
- # TODO:设置一下update---table,如果失败了sql_session需要关闭
- @staticmethod
- def refresh_wechat_cookies(tornado_web, user_id):
- # TODO:添加互动接口,添加状态字段,打开selenium就变换
- # 1.返回二维码链接
- # ----1.查看cookie是否可用
- sql_session = db.DBSession()
- cookie_db = sql_tools.get_wechat_cookies(sql_session, user_id=user_id)
- # 进行登录操作
- log_ad = LogIn(user_id=user_id)
- # 使driver可以使用
- cookie_canuse = False
- if cookie_db:
- cookie_db = pickle.loads(cookie_db)
- if not log_ad.wechat_cookies_check_alive(cookie_db):
- # cookie 不能使用
- wechat_code = log_ad.log_in()
- tornado_web.write({'status': {'msg': 'success', "RetCode": 200},
- 'wechat_code': wechat_code})
- print('cookie can not use')
- else:
- # cookie 可以继续使用
- cookie_canuse = True
- log_ad.driver.get('https://a.weixin.qq.com/index.html')
- tornado_web.write({'status': {'msg': 'success', "RetCode": 200}})
- else:
- # cookie 不能使用
- wechat_code = log_ad.log_in()
- tornado_web.write({'status': {'msg': 'success', "RetCode": 200},
- 'wechat_code': wechat_code})
- return log_ad, cookie_canuse
- # 1.人群包获取
- def get(self):
- sql_session = db.DBSession()
- try:
- # 0.是否刷新
- # 1.获取userid,以及是否刷新
- user_id = self.get_argument("user_id", None)
- human_package_name = self.get_argument('human_package_name', None)
- is_refresh = self.get_argument("is_refresh", None)
- wechat_name = self.get_argument('wechat_name', None)
- service_name = self.get_argument('service_name', None)
- print(user_id, is_refresh)
- if user_id is None or is_refresh is None or wechat_name is None or service_name is None:
- self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
- return
- # TODO:一个涉及到selenium-driver的请求-生命周期.----看一下tornado是怎么处理请求的生命周期
- if int(is_refresh) == 1:
- if not create_ad_plan.check_task(user_id=user_id):
- log_ad, cookie_canuse = self.refresh_wechat_cookies(self, user_id=user_id)
- task_name = 'user_id: {user_id} time:{time_sign} action:refresh_wechat_info'.format(
- user_id=user_id,
- time_sign=datetime.now().strftime(
- "%Y-%m-%d, %H:%M:%S"))
- # 行为记录
- action_type = refresh_wechat_action
- object_name = ''
- service_name = ''
- wechat_name = ''
- action_info = {'user_id': user_id, 'service_name': service_name, 'wechat_name': wechat_name,
- 'action_type': action_type, 'object_name': object_name, 'task_name': task_name,
- 'status': 'todo'}
- record_insert = sql_tools.save_action_record(action_record_info=action_info,
- table_action_record=action_record_table)
- sql_session.execute(record_insert)
- sql_session.commit()
- threading.Thread(target=user_action.get_human_info,
- args=(
- user_id, log_ad, db, cookie_canuse)).start()
- else:
- self.write({'status': {'msg': '', "RetCode": 200}})
- else:
- # 1.查看是否在刷新,
- # 在刷新中,
- # 返回正在刷新
- # -------不管上面逻辑让他们多刷新几次
- # 不在刷新
- # 返回对应数据
- # 2.获取userid对应数据
- result = sql_tools.get_human_info(sql_session=sql_session,
- service_name=service_name, wechat_name=wechat_name)
- print(result)
- result = json.loads(result)
- if human_package_name:
- result = [_ for _ in result if human_package_name in _['name']]
- result_ = []
- for i in range(len(result)):
- _ = result[i]
- _['id'] = i
- result_.append(_)
- self.write({'status': {'msg': 'success', "RetCode": 200},
- 'human_info': result})
- except Exception as e:
- logging.error(str(e))
- try:
- sql_session.commit()
- except:
- pass
- class ad_wechat_info(BaseHandler):
- # 1.公众号相关信息获取
- def get(self):
- try:
- # TODO:添加分页,
- # 公众号,服务商,唯一id设计或者获取
- # 0.是否刷新
- # 1.获取userid,以及是否刷新
- user_id = self.get_argument("user_id", None)
- is_refresh = self.get_argument("is_refresh", None)
- print(user_id, is_refresh)
- if user_id is None or is_refresh is None:
- self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
- return
- sql_session = db.DBSession()
- if int(is_refresh) == 1:
- # 检查有无其他任务在处理中,有则等待
- if not create_ad_plan.check_task(user_id=user_id):
- log_ad, cookie_canuse = ad_human_info.refresh_wechat_cookies(self, user_id=user_id)
- task_name = 'user_id: {user_id} time:{time_sign} action:refresh_wechat_info'.format(
- user_id=user_id,
- time_sign=datetime.now().strftime(
- "%Y-%m-%d, %H:%M:%S"))
- # 行为记录
- action_type = refresh_wechat_action
- object_name = ''
- service_name = ''
- wechat_name = ''
- action_info = {'user_id': user_id, 'service_name': service_name, 'wechat_name': wechat_name,
- 'action_type': action_type, 'object_name': object_name, 'task_name': task_name,
- 'status': 'todo'}
- record_insert = sql_tools.save_action_record(action_record_info=action_info,
- table_action_record=action_record_table)
- sql_session.execute(record_insert)
- sql_session.commit()
- threading.Thread(target=user_action.get_human_info,
- args=(
- user_id, log_ad, db, cookie_canuse)).start()
- else:
- self.write({'status': {'msg': '', "RetCode": 200}})
- else:
- result = sql_tools.get_wechat_info(sql_session=sql_session, user_id=user_id)
- result_list = []
- for _ in result:
- service_name, wechat_name = _
- result_list.append({'service_name': service_name, 'wechat_name': wechat_name})
- print(result_list)
- self.write({'status': {'msg': 'success', "RetCode": 200},
- 'wechat_info': result_list})
- except Exception as e:
- try:
- sql_session.commit()
- except:
- pass
- class delete_ad_layout(BaseHandler):
- def get(self):
- user_id = self.get_argument('user_id', None)
- layout_name = self.get_argument('layout_name', None)
- sql_session = db.DBSession()
- if user_id is None or layout_name is None:
- self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
- return
- # 落地页名字精确到毫秒,默认是全局唯一
- sql_tools.delete_layout_typesetting_vir(sql_session=sql_session, user_id=user_id,
- typesetting_name=layout_name)
- self.write({'status': {'msg': 'success', "RetCode": 200}})
- class delete_ad_plan(BaseHandler):
- def get(self):
- user_id = self.get_argument('user_id', None)
- plan_name = self.get_argument('plan_name', None)
- service_name = self.get_argument('service_name', None)
- wechat_name = self.get_argument('wechat_name', None)
- sql_session = db.DBSession()
- if user_id is None or plan_name is None:
- self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
- return
- # 落地页名字精确到毫秒,默认是全局唯一
- sql_tools.delete_ad_plan_typesetting_vir(sql_session=sql_session, user_id=user_id,
- typesetting_name=plan_name, wechat_name=wechat_name,
- service_name=service_name)
- self.write({'status': {'msg': 'success', "RetCode": 200}})
- class get_ad_wechat_service_name(BaseHandler):
- def get(self):
- user_id = self.get_argument('user_id', None)
- sql_session = db.DBSession()
- if user_id is None:
- self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
- return
- result = sql_tools.get_wechat_info_service_name(sql_session=sql_session, user_id=user_id)
- result_list = []
- for _ in result:
- service_name = _
- result_list.append({'service_name': service_name})
- print(result_list)
- self.write({'status': {'msg': 'success', "RetCode": 200},
- 'wechat_info': result_list})
- class get_ad_wechat_wechat_name(BaseHandler):
- def get(self):
- user_id = self.get_argument('user_id', None)
- service_name = self.get_argument('service_name', None)
- sql_session = db.DBSession()
- if user_id is None or service_name is None:
- self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
- return
- result = sql_tools.get_wechat_info_wechat_name(sql_session=sql_session, user_id=user_id,
- service_name=service_name)
- result_list = []
- for _ in result:
- service_name, wechat_name = _
- result_list.append({'service_name': service_name, 'wechat_name': wechat_name})
- print(result_list)
- self.write({'status': {'msg': 'success', "RetCode": 200},
- 'wechat_info': result_list})
- class get_plan_action_record(BaseHandler):
- def get(self):
- user_id = self.get_argument('user_id', None)
- service_name = self.get_argument('service_name', None)
- wechat_name = self.get_argument('wechat_name', None)
- status = self.get_argument('status', None)
- plan_name = self.get_argument('plan_name', None)
- sql_session = db.DBSession()
- if user_id is None:
- self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
- return
- # 落地页名字精确到毫秒,默认是全局唯一
- result = sql_tools.get_plan_record(sql_session=sql_session, user_id=user_id,
- service_name=service_name, wechat_name=wechat_name,
- status=status, plan_name=plan_name)
- result_ = []
- for i in range(len(result)):
- print(result[i])
- user_id, name, service_name, wechat_name, create_time, status, typesetting, wechat_id_info = result[i]
- _ = {}
- _['typesetting'] = json.loads(typesetting)
- _['ad_plan_name'] = name
- _['id'] = i
- _['create_time'] = create_time.strftime("%Y-%m-%d %H:%M:%S")
- _['service_name'] = service_name
- _['wechat_name'] = wechat_name
- _['wechat_id_info'] = wechat_id_info
- _['status'] = status
- result_.append(_)
- self.write({'status': {'msg': 'success', "RetCode": 200},
- 'local_ad_plan_info': result_})
- class get_all_ad_task(BaseHandler):
- def get(self):
- user_id = self.get_argument('user_id', None)
- sql_session = db.DBSession()
- if user_id is None:
- self.write({'status': {'msg': 'url parameter error', "RetCode": 400}})
- return
- # 落地页名字精确到毫秒,默认是全局唯一
- result = sql_tools.get_ad_task(sql_session=sql_session, user_id=user_id)
- task_dict = {}
- localtion = ['wechat', '']
- for _ in result:
- task_name, status, task_status_num, create_time, typesetting = _
- print(typesetting)
- typesetting = json.loads(typesetting)
- if typesetting['plan_base'][1] == 'pyq':
- localtion[1] = 'pyq'
- create_time = create_time.strftime("%Y-%m-%d %H:%M:%S")
- if task_name not in task_dict.keys():
- task_dict[task_name] = {}
- task_dict[task_name][status] = (task_status_num, create_time)
- result_ = []
- num = 0
- for k, v in task_dict.items():
- # TODO:修改为dict的sort
- sum_num = 0
- print(k, v)
- new_dict = {}
- create_time = None
- for k_, v_ in v.items():
- task_status_num, create_time = v_
- sum_num = sum_num + task_status_num
- new_dict[k_] = task_status_num
- status = 'todo' if 'todo' in new_dict.keys() else 'done'
- task_dict[k]['sum_num'] = sum_num
- new_dict['sum_num'] = sum_num
- result_.append(
- {'task_name': k, 'task_info': new_dict, 'create_time': create_time, 'channel': localtion[0],
- 'localtion': localtion[1], 'id': num, 'status': status})
- num = num + 1
- print(json.dumps(task_dict))
- self.write({'status': {'msg': 'success', "RetCode": 200},
- 'local_ad_plan_info': result_})
- def heart_jump():
- # TODO:tornado 心跳检测,下周做----线程不断检查,线程生命周期60分钟
- pass
- def make_app():
- return tornado.web.Application([
- ("/get_all_ad_task", get_all_ad_task), # 获取所有任务状态,
- ("/create_ad_plan", create_ad_plan), #
- ("/get_ad_wechat_service_name", get_ad_wechat_service_name),
- ("/get_ad_wechat_wechat_name", get_ad_wechat_wechat_name),
- # ("/create_ad_plan_local", create_ad_plan_local),
- ("/create_ad_layout_local", create_ad_layout_local),
- ("/get_layout_local", get_ad_layout_local),
- ("/get_ad_plan_local", get_ad_plan_local),
- ("/delete_layout_local", delete_ad_layout),
- ("/delete_ad_plan_local", delete_ad_plan),
- # ("/create_ad_layout_remote", create_ad_layout_remote),
- ("/ad_human_info", ad_human_info),
- ("/ad_wechat_info", ad_wechat_info),
- ("/get_plan_action_record", get_plan_action_record),
- ], debug=True, autoreload=True)
- if __name__ == "__main__":
- import logging
- logging.basicConfig(
- handlers=[
- logging.handlers.RotatingFileHandler('./tornado.log',
- maxBytes=10 * 1024 * 1024,
- backupCount=5,
- encoding='utf-8')
- , logging.StreamHandler() # 供输出使用
- ],
- level=logging.INFO,
- format="%(asctime)s - %(levelname)s %(filename)s %(funcName)s %(lineno)s - %(message)s"
- )
- handler = logging.FileHandler('tornado.log')
- logger = logging.getLogger()
- logger.addHandler(handler)
- logger.setLevel(logging.INFO)
- app = make_app()
- app.listen(8888)
- tornado.ioloop.IOLoop.current().start()
|