diff --git a/app/utils/security.py b/app/utils/security.py index 1671827..0661b42 100644 --- a/app/utils/security.py +++ b/app/utils/security.py @@ -10,7 +10,7 @@ import redis.asyncio as redis from app.models.base import ReservedWords, User -from settings import SECRET_KEY +from settings import settings redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True) ALGORITHM = "HS256" @@ -95,7 +95,7 @@ async def _decode_and_load_user(token: str) -> Tuple[User, Dict]: raise HTTPException(status_code=401, detail="token 已失效") try: - payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM]) except ExpiredSignatureError: raise HTTPException(status_code=401, detail="登陆信息已过期") except JWTError: @@ -111,39 +111,43 @@ async def _decode_and_load_user(token: str) -> Tuple[User, Dict]: # 从请求头中获取当前用户信息 -async def get_current_user(request: Request) -> Tuple[User, Dict]: - token = await _extract_bearer_token(request) - return await _decode_and_load_user(token) +async def get_current_user_basic(request: Request) -> Tuple[User, Dict]: + token = await _extract_bearer_token(request) + return await _decode_and_load_user(token) -async def is_admin_user(user_payload: Tuple[User, Dict] = Depends(get_current_user)) -> Tuple[User, Dict]: - user, payload=user_payload +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/users/login") + + +async def get_current_user_with_oauth( + token: Annotated[str, Depends(oauth2_scheme)] +) -> Tuple[User, Dict]: + return await _decode_and_load_user(token) + + +async def get_current_user(*args, **kwargs) -> Tuple[User, Dict]: + if settings.USE_OAUTH: + return await get_current_user_with_oauth(*args, **kwargs) + return await get_current_user_with_oauth(*args, **kwargs) + + +async def is_admin_user_basic(user_payload: Tuple[User, Dict] = Depends(get_current_user)) -> Tuple[User, Dict]: + user, payload = user_payload if not getattr(user, "is_admin", False): raise HTTPException(status_code=403, detail="Access denied") return user, payload -oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/users/logout") +async def is_admin_user_oauth( + user_payload: Tuple[User, Dict] = Depends(get_current_user_with_oauth) +) -> Tuple[User, Dict]: + user, payload = user_payload + if not getattr(user, "is_admin", False): + raise HTTPException(status_code=403, detail="Access denied") + return user, payload -async def get_current_user_with_OAuth(token: Annotated[str, Depends(oauth2_scheme)]): - # TODO OAuth验证 - # Redis 黑名单检查 - blacklisted = await redis_client.get(f"blacklist:{token}") - if blacklisted: - raise HTTPException(status_code=401, detail="Token 已失效") - - try: - payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) - user_id = payload.get("user_id") - if not user_id: - raise HTTPException(status_code=401, detail="无效 token") - user = await User.get_or_none(id=user_id) - if not user: - raise HTTPException(status_code=401, detail="用户不存在") - - return user, payload - except ExpiredSignatureError: - raise HTTPException(status_code=401, detail="token 已过期") - except JWTError: - raise HTTPException(status_code=401, detail="") +async def is_admin_user(*args, **kwargs) -> Tuple[User, Dict]: + if settings.USE_OAUTH: + return await is_admin_user_basic(*args, **kwargs) + return await is_admin_user_oauth(*args, **kwargs) diff --git a/settings.py b/settings.py index bfd59af..2981ee6 100644 --- a/settings.py +++ b/settings.py @@ -1,3 +1,5 @@ +from pydantic.v1 import BaseSettings + TORTOISE_ORM = { 'connections': { 'default': { @@ -32,4 +34,8 @@ TORTOISE_ORM = { 'timezone': 'Asia/Shanghai' } -SECRET_KEY = "asdasdasd-odjfnsodfnosidnfdf-0oq2j01j0jf0i1ej0fij10fd" +class Settings(BaseSettings): + USE_OAUTH = False + SECRET_KEY = "asdasdasd-odjfnsodfnosidnfdf-0oq2j01j0jf0i1ej0fij10fd" + +settings = Settings()