| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 | import simplejson as jsonimport tracebackfrom concurrent.futures import ThreadPoolExecutorfrom tornado.web import RequestHandlerfrom model.log import loggerimport timeimport base64import pandas as pdfrom model.DateUtils import DateUtilsfrom pandas.io.excel import ExcelWriterfrom pandas import read_csvlog = logger()class BaseHandler(RequestHandler, DateUtils):    def __init__(self, application, request, **kwargs):        RequestHandler.__init__(self, application, request, **kwargs)        self._status_code = 200        self.executor = ThreadPoolExecutor(200)        self.set_default_headers()        self._au = self.get_auth() if self.settings.get('auth') else True    def options(self):        # 返回方法1        self.set_status(200)        self.finish()    def set_default_headers(self):        super().set_default_headers()        # 设置允许的请求头        self.set_header("Access-Control-Allow-Methods", "GET,POST,PUT,DELETE,OPTIONS")        self.set_header("X-XSS-Protecion", "1")        self.set_header("Content-Security-Policy", "default-src 'self'")        self.set_header("Access-Control-Allow-Credentials", "true")        # 设置一些自己定义的请求头        self.set_header("Access-Control-Allow-Headers",                        "Content-Type, Depth, User-Agent, Token, Origin, X-Requested-With, Accept, Authorization")        self.set_header("Content-Type", "application/json; charset=UTF-8")        self.set_header("Access-Control-Allow-Origin", "*")    def write_json_tmp_java(self, data, status_code=200, msg='SUCCESS', total=1, total_data={}):        self.write(json.dumps({'code': status_code, 'data': data, 'msg': msg}))    def write_json(self, data, status_code=200, msg='success', total=1, total_data={}):        self.write(json.dumps(            {'status': {'msg': msg, "RetCode": status_code}, 'total': total, 'data': data, "total_data": total_data}))    def write_fail(self, code=400, msg='error'):        self.write(json.dumps({'status': {'msg': msg, "RetCode": code}}))    def write_download(self, filename, data):        self.set_header('Content-Type', 'application/octet-stream')        self.set_header('Content-Disposition', f'attachment; filename={filename}.csv')        self.set_header("Pargam", "no-cache")        self.set_header("Cache-Control", "no-cache")        df = pd.DataFrame(data).to_csv(encoding='utf-8')        # print(df)        # with open(f'./{pitcher}_{start}_{end}.csv','w',newline='') as f:        #     f.write(df)        f = open(df)        data = read_csv(f)        data.to_excel('xxx.xlsx')        self.write(data)    def get_args(self):        di = json.loads(self.request.body.decode(encoding='utf-8'))        if isinstance(di, str):            di = json.loads(di)        return di    def write_error(self, status_code, msg=None, **kwargs):        if self.settings.get("serve_traceback") and "exc_info" in kwargs:            # in debug mode, try to send a traceback            lines = []            for line in traceback.format_exception(*kwargs["exc_info"]):                lines.append(line)            self.write_json(dict(traceback=''.join(lines)), status_code, self._reason)        elif msg:            self.write_json(None, status_code, msg)        else:            self.write_json(None, status_code, self._reason)    def _authentication(self):        """        :return: True, 认证通过, False 认证不通过        """        return True        log.info("author %s" % self.request.headers)        # log.info(self.request.remote_ip)        if self.request.headers.get("Gip_real") == '183.129.168.74':            return True        if not self.request.headers.get("Authorization"):            return False        else:            # redis 中判断值是否存在            # ur = UserRedisComm()            # key = "admin_account_check%s" % (self.request.headers.get("Authorization"))            # return True if ur.r.get(key) else False            return True    def get_auth(self):        # 不需要验证的请求        authless = ['/api/get_yangguang_data', '/api/git_hook/data_center', '/api/git_hook/qc_web']        url = self.request.full_url().split(str(self.settings.get('port')) + '/')[1]        if url in authless:            return True        else:            Authorization = self.request.headers.get('Authorization')            if not Authorization:                return False            print(Authorization)            try:                origStr = Authorization.split(' ')[1][::-1]            except:                return False            origStr += (4 - len(origStr) % 4) * "="            print(origStr)            print(float(base64.b64decode(origStr.encode('utf-8')).decode(encoding='utf-8', errors='ignore')))            b = str(float(base64.b64decode(origStr.encode('utf-8')).decode(encoding='utf-8', errors='ignore')) * int(                self.now.day))[:10]            print("b:", b)            diff = int(time.mktime(time.localtime())) - int(b)            print(diff)            if diff < 10:                return True            else:                return False
 |