From 3b07839e1746ee511c71a0709fc61e3ea5810d42 Mon Sep 17 00:00:00 2001 From: Miyamizu-MitsuhaSang <2510681107@qq.com> Date: Sat, 16 Aug 2025 22:21:07 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E4=BA=86oauth=E7=89=88?= =?UTF-8?q?=E6=9C=AC=E7=9A=84=E7=94=A8=E6=88=B7=E5=92=8C=E7=AE=A1=E7=90=86?= =?UTF-8?q?=E5=91=98=E8=AE=A4=E8=AF=81=E5=87=BD=E6=95=B0=EF=BC=8C=E5=90=8C?= =?UTF-8?q?=E6=97=B6=E5=BC=95=E5=85=A5USE=5FOAUTH=E6=9D=A5=E7=AE=A1?= =?UTF-8?q?=E6=8E=A7=E4=BD=BF=E7=94=A8=E7=9A=84=E5=87=BD=E6=95=B0=E7=89=88?= =?UTF-8?q?=E6=9C=AC=EF=BC=8C=E5=90=8E=E7=BB=AD=E4=BB=8Ebasic=E7=89=88?= =?UTF-8?q?=E6=9B=B4=E6=8D=A2=E4=B8=BAoauth=E7=89=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/utils/security.py | 62 +++++++++++++++++++++++-------------------- settings.py | 8 +++++- 2 files changed, 40 insertions(+), 30 deletions(-) diff --git a/app/utils/security.py b/app/utils/security.py index 1671827..0661b42 100644 --- a/app/utils/security.py +++ b/app/utils/security.py @@ -10,7 +10,7 @@ import redis.asyncio as redis from app.models.base import ReservedWords, User -from settings import SECRET_KEY +from settings import settings redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True) ALGORITHM = "HS256" @@ -95,7 +95,7 @@ async def _decode_and_load_user(token: str) -> Tuple[User, Dict]: raise HTTPException(status_code=401, detail="token 已失效") try: - payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM]) except ExpiredSignatureError: raise HTTPException(status_code=401, detail="登陆信息已过期") except JWTError: @@ -111,39 +111,43 @@ async def _decode_and_load_user(token: str) -> Tuple[User, Dict]: # 从请求头中获取当前用户信息 -async def get_current_user(request: Request) -> Tuple[User, Dict]: - token = await _extract_bearer_token(request) - return await _decode_and_load_user(token) +async def get_current_user_basic(request: Request) -> Tuple[User, Dict]: + token = await _extract_bearer_token(request) + return await _decode_and_load_user(token) -async def is_admin_user(user_payload: Tuple[User, Dict] = Depends(get_current_user)) -> Tuple[User, Dict]: - user, payload=user_payload +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/users/login") + + +async def get_current_user_with_oauth( + token: Annotated[str, Depends(oauth2_scheme)] +) -> Tuple[User, Dict]: + return await _decode_and_load_user(token) + + +async def get_current_user(*args, **kwargs) -> Tuple[User, Dict]: + if settings.USE_OAUTH: + return await get_current_user_with_oauth(*args, **kwargs) + return await get_current_user_with_oauth(*args, **kwargs) + + +async def is_admin_user_basic(user_payload: Tuple[User, Dict] = Depends(get_current_user)) -> Tuple[User, Dict]: + user, payload = user_payload if not getattr(user, "is_admin", False): raise HTTPException(status_code=403, detail="Access denied") return user, payload -oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/users/logout") +async def is_admin_user_oauth( + user_payload: Tuple[User, Dict] = Depends(get_current_user_with_oauth) +) -> Tuple[User, Dict]: + user, payload = user_payload + if not getattr(user, "is_admin", False): + raise HTTPException(status_code=403, detail="Access denied") + return user, payload -async def get_current_user_with_OAuth(token: Annotated[str, Depends(oauth2_scheme)]): - # TODO OAuth验证 - # Redis 黑名单检查 - blacklisted = await redis_client.get(f"blacklist:{token}") - if blacklisted: - raise HTTPException(status_code=401, detail="Token 已失效") - - try: - payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) - user_id = payload.get("user_id") - if not user_id: - raise HTTPException(status_code=401, detail="无效 token") - user = await User.get_or_none(id=user_id) - if not user: - raise HTTPException(status_code=401, detail="用户不存在") - - return user, payload - except ExpiredSignatureError: - raise HTTPException(status_code=401, detail="token 已过期") - except JWTError: - raise HTTPException(status_code=401, detail="") +async def is_admin_user(*args, **kwargs) -> Tuple[User, Dict]: + if settings.USE_OAUTH: + return await is_admin_user_basic(*args, **kwargs) + return await is_admin_user_oauth(*args, **kwargs) diff --git a/settings.py b/settings.py index bfd59af..2981ee6 100644 --- a/settings.py +++ b/settings.py @@ -1,3 +1,5 @@ +from pydantic.v1 import BaseSettings + TORTOISE_ORM = { 'connections': { 'default': { @@ -32,4 +34,8 @@ TORTOISE_ORM = { 'timezone': 'Asia/Shanghai' } -SECRET_KEY = "asdasdasd-odjfnsodfnosidnfdf-0oq2j01j0jf0i1ej0fij10fd" +class Settings(BaseSettings): + USE_OAUTH = False + SECRET_KEY = "asdasdasd-odjfnsodfnosidnfdf-0oq2j01j0jf0i1ej0fij10fd" + +settings = Settings()