security.py 2.05 KB
from flask import request, redirect, url_for
from functools import wraps
import bleach
from utils.logger import app_logger as logging

def sanitize_input(text):
    """清理用户输入,防止XSS攻击"""
    if text is None:
        return None
    return bleach.clean(str(text), strip=True)

def set_secure_headers(response):
    """设置安全响应头"""
    response.headers['X-Content-Type-Options'] = 'nosniff'
    response.headers['X-Frame-Options'] = 'SAMEORIGIN'
    response.headers['X-XSS-Protection'] = '1; mode=block'
    response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
    response.headers['Content-Security-Policy'] = "default-src 'self'; script-src 'self' 'unsafe-inline' 'unsafe-eval'; style-src 'self' 'unsafe-inline';"
    return response

def require_https():
    """强制HTTPS中间件"""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            if not request.is_secure and not getattr(request, 'is_localhost', False):
                # 使用 _external=True 和 _scheme='https' 生成完整的 HTTPS URL
                secure_url = url_for(
                    request.endpoint,
                    _external=True,
                    _scheme='https',
                    **request.view_args
                )
                # 添加查询参数
                if request.query_string:
                    secure_url = f"{secure_url}?{request.query_string.decode('utf-8')}"
                return redirect(secure_url, code=301)
            return f(*args, **kwargs)
        return decorated_function
    return decorator

def log_request_info():
    """请求日志记录中间件"""
    sanitized_headers = dict(request.headers)
    if 'Authorization' in sanitized_headers:
        sanitized_headers['Authorization'] = '[FILTERED]'
    if 'Cookie' in sanitized_headers:
        sanitized_headers['Cookie'] = '[FILTERED]'

    logging.info(
        f"Request: {request.method} {request.path}\n"
        f"Remote IP: {request.remote_addr}\n"
        f"Headers: {sanitized_headers}"
    )