123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- import simplejson as json
- import traceback
- from concurrent.futures import ThreadPoolExecutor
- from tornado.web import RequestHandler
- from model.log import logger
- import time
- import base64
- import pandas as pd
- from model.DateUtils import DateUtils
- from pandas.io.excel import ExcelWriter
- log = logger()
- class BaseHandler(RequestHandler, DateUtils):
- def __init__(self, application, request, **kwargs):
- RequestHandler.__init__(self, application, request, **kwargs)
- self._status_code = 200
- self.executor = ThreadPoolExecutor(200)
- self.set_default_headers()
- self._au = self.get_auth() if self.settings.get('auth') else True
- def options(self):
- # 返回方法1
- self.set_status(200)
- self.finish()
- def set_default_headers(self):
- super().set_default_headers()
- # 设置允许的请求头
- self.set_header("Access-Control-Allow-Methods", "GET,POST,PUT,DELETE,OPTIONS")
- self.set_header("X-XSS-Protecion", "1")
- self.set_header("Content-Security-Policy", "default-src 'self'")
- self.set_header("Access-Control-Allow-Credentials", "true")
- # 设置一些自己定义的请求头
- self.set_header("Access-Control-Allow-Headers",
- "Content-Type, Depth, User-Agent, Token, Origin, X-Requested-With, Accept, Authorization")
- self.set_header("Content-Type", "application/json; charset=UTF-8")
- self.set_header("Access-Control-Allow-Origin", "*")
- def write_json_tmp_java(self, data, status_code=200, msg='SUCCESS', total=1, total_data={}):
- self.write(json.dumps({'code': status_code, 'data': data, 'msg': msg}))
- def write_json(self, data, status_code=200, msg='success', total=1, total_data={}):
- self.write(json.dumps(
- {'status': {'msg': msg, "RetCode": status_code}, 'total': total, 'data': data, "total_data": total_data}))
- def write_fail(self, code=400, msg='error'):
- self.write(json.dumps({'status': {'msg': msg, "RetCode": code}}))
- def write_download(self, filename, data):
- self.set_header('Content-Type', 'application/octet-stream')
- self.set_header('Content-Disposition', f'attachment; filename={filename}.csv')
- self.set_header("Pargam", "no-cache")
- self.set_header("Cache-Control", "no-cache")
- df = pd.DataFrame(data).to_csv(encoding='utf-8')
- print(df)
- # with open(f'./{pitcher}_{start}_{end}.csv','w',newline='') as f:
- # f.write(df)
- with ExcelWriter("xxxx.xlsx") as ew:
- df.to_excel(ew,sheet_name="1",index=False)
- self.write(df)
- def get_args(self):
- di = json.loads(self.request.body.decode(encoding='utf-8'))
- if isinstance(di, str):
- di = json.loads(di)
- return di
- def write_error(self, status_code, msg=None, **kwargs):
- if self.settings.get("serve_traceback") and "exc_info" in kwargs:
- # in debug mode, try to send a traceback
- lines = []
- for line in traceback.format_exception(*kwargs["exc_info"]):
- lines.append(line)
- self.write_json(dict(traceback=''.join(lines)), status_code, self._reason)
- elif msg:
- self.write_json(None, status_code, msg)
- else:
- self.write_json(None, status_code, self._reason)
- def _authentication(self):
- """
- :return: True, 认证通过, False 认证不通过
- """
- return True
- log.info("author %s" % self.request.headers)
- # log.info(self.request.remote_ip)
- if self.request.headers.get("Gip_real") == '183.129.168.74':
- return True
- if not self.request.headers.get("Authorization"):
- return False
- else:
- # redis 中判断值是否存在
- # ur = UserRedisComm()
- # key = "admin_account_check%s" % (self.request.headers.get("Authorization"))
- # return True if ur.r.get(key) else False
- return True
- def get_auth(self):
- # 不需要验证的请求
- authless = ['/api/get_yangguang_data', '/api/git_hook/data_center', '/api/git_hook/qc_web']
- url = self.request.full_url().split(str(self.settings.get('port')) + '/')[1]
- if url in authless:
- return True
- else:
- Authorization = self.request.headers.get('Authorization')
- if not Authorization:
- return False
- print(Authorization)
- try:
- origStr = Authorization.split(' ')[1][::-1]
- except:
- return False
- origStr += (4 - len(origStr) % 4) * "="
- print(origStr)
- print(float(base64.b64decode(origStr.encode('utf-8')).decode(encoding='utf-8', errors='ignore')))
- b = str(float(base64.b64decode(origStr.encode('utf-8')).decode(encoding='utf-8', errors='ignore')) * int(
- self.now.day))[:10]
- print("b:", b)
- diff = int(time.mktime(time.localtime())) - int(b)
- print(diff)
- if diff < 10:
- return True
- else:
- return False
|