import simplejson as json import traceback from datetime import date from concurrent.futures import ThreadPoolExecutor from tornado.web import RequestHandler from model.log import logger from model.common import errors import time import base64 import pandas as pd from config.config import auth,port log = logger() from model.DateUtils import DateUtils class BaseHandler(RequestHandler,DateUtils): def __init__(self, application, request, **kwargs): RequestHandler.__init__(self, application, request, **kwargs) self._status_code = 200 self.executor = ThreadPoolExecutor(200) self.set_default_headers() self._au = True if auth: self._au = self._auth() def options(self): # 返回方法1 self.set_status(200) self.finish() def set_default_headers(self): super().set_default_headers() # 设置允许的请求头 self.set_header("Access-Control-Allow-Methods", "GET,POST,PUT,DELETE,OPTIONS") self.set_header("X-XSS-Protecion", "1") self.set_header("Content-Security-Policy", "default-src 'self'") self.set_header("Access-Control-Allow-Credentials", "true") # 设置一些自己定义的请求头 self.set_header("Access-Control-Allow-Headers", "Content-Type, Depth, User-Agent, Token, Origin, X-Requested-With, Accept, Authorization") self.set_header("Content-Type", "application/json; charset=UTF-8") self.set_header("Access-Control-Allow-Origin","*") def write_json(self, data, status_code=200, msg='success',total=1,total_data={}): self.write(json.dumps({'status': {'msg': msg, "RetCode": status_code},'total':total,'data': data,"total_data":total_data})) def write_fail(self,code=400,msg='error'): self.write(json.dumps({'status': {'msg': msg, "RetCode": code}})) def write_download(self, filename, data): self.set_header('Content-Type', 'application/octet-stream') self.set_header('Content-Disposition', f'attachment; filename={filename}.csv') self.set_header("Pargam", "no-cache") self.set_header("Cache-Control", "no-cache") df = pd.DataFrame(data).to_csv(encoding='utf-8') # print(df) # with open(f'./{pitcher}_{start}_{end}.csv','w',newline='') as f: # f.write(df) self.write(df) def get_args(self): di=json.loads(self.request.body.decode(encoding='utf-8')) if isinstance(di,str): di = json.loads(di) return di def write_error(self, status_code, msg=None, **kwargs): if self.settings.get("serve_traceback") and "exc_info" in kwargs: # in debug mode, try to send a traceback lines = [] for line in traceback.format_exception(*kwargs["exc_info"]): lines.append(line) self.write_json(dict(traceback=''.join(lines)), status_code, self._reason) elif msg: self.write_json(None, status_code, msg) else: self.write_json(None, status_code, self._reason) def _authentication(self): """ :return: True, 认证通过, False 认证不通过 """ return True log.info("author %s" % self.request.headers) # log.info(self.request.remote_ip) if self.request.headers.get("Gip_real") == '183.129.168.74': return True if not self.request.headers.get("Authorization"): return False else: # redis 中判断值是否存在 # ur = UserRedisComm() # key = "admin_account_check%s" % (self.request.headers.get("Authorization")) # return True if ur.r.get(key) else False return True def _auth(self): # 不需要验证的请求 authless = ['/api/get_yangguang_data', '/api/git_hook/data_center', '/api/git_hook/qc_web'] url = self.request.full_url().split(str(port)+'/')[1] if url in authless: return True else: Authorization = self.request.headers.get('Authorization') if not Authorization: return False print(Authorization) origStr = Authorization[::-1] if (len(origStr) % 3 == 1): origStr += "==" elif (len(origStr) % 3 == 2): origStr += "=" b = str(float(base64.b64decode(origStr.encode('utf-8')).decode(encoding='utf-8',errors='ignore')) * int(self.now.day))[:10] diff =int(time.mktime(time.localtime()))-int(b) print(diff) if diff < 60: return True else: return False