更新了oauth版本的用户和管理员认证函数,同时引入USE_OAUTH来管控使用的函数版本,后续从basic版更换为oauth版

This commit is contained in:
Miyamizu-MitsuhaSang 2025-08-16 22:21:07 +08:00
parent 7ecfa03c7f
commit 3b07839e17
2 changed files with 40 additions and 30 deletions

View File

@ -10,7 +10,7 @@ import redis.asyncio as redis
from app.models.base import ReservedWords, User
from settings import SECRET_KEY
from settings import settings
redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)
ALGORITHM = "HS256"
@ -95,7 +95,7 @@ async def _decode_and_load_user(token: str) -> Tuple[User, Dict]:
raise HTTPException(status_code=401, detail="token 已失效")
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM])
except ExpiredSignatureError:
raise HTTPException(status_code=401, detail="登陆信息已过期")
except JWTError:
@ -111,39 +111,43 @@ async def _decode_and_load_user(token: str) -> Tuple[User, Dict]:
# 从请求头中获取当前用户信息
async def get_current_user(request: Request) -> Tuple[User, Dict]:
token = await _extract_bearer_token(request)
return await _decode_and_load_user(token)
async def get_current_user_basic(request: Request) -> Tuple[User, Dict]:
token = await _extract_bearer_token(request)
return await _decode_and_load_user(token)
async def is_admin_user(user_payload: Tuple[User, Dict] = Depends(get_current_user)) -> Tuple[User, Dict]:
user, payload=user_payload
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/users/login")
async def get_current_user_with_oauth(
token: Annotated[str, Depends(oauth2_scheme)]
) -> Tuple[User, Dict]:
return await _decode_and_load_user(token)
async def get_current_user(*args, **kwargs) -> Tuple[User, Dict]:
if settings.USE_OAUTH:
return await get_current_user_with_oauth(*args, **kwargs)
return await get_current_user_with_oauth(*args, **kwargs)
async def is_admin_user_basic(user_payload: Tuple[User, Dict] = Depends(get_current_user)) -> Tuple[User, Dict]:
user, payload = user_payload
if not getattr(user, "is_admin", False):
raise HTTPException(status_code=403, detail="Access denied")
return user, payload
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/users/logout")
async def is_admin_user_oauth(
user_payload: Tuple[User, Dict] = Depends(get_current_user_with_oauth)
) -> Tuple[User, Dict]:
user, payload = user_payload
if not getattr(user, "is_admin", False):
raise HTTPException(status_code=403, detail="Access denied")
return user, payload
async def get_current_user_with_OAuth(token: Annotated[str, Depends(oauth2_scheme)]):
# TODO OAuth验证
# Redis 黑名单检查
blacklisted = await redis_client.get(f"blacklist:{token}")
if blacklisted:
raise HTTPException(status_code=401, detail="Token 已失效")
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id = payload.get("user_id")
if not user_id:
raise HTTPException(status_code=401, detail="无效 token")
user = await User.get_or_none(id=user_id)
if not user:
raise HTTPException(status_code=401, detail="用户不存在")
return user, payload
except ExpiredSignatureError:
raise HTTPException(status_code=401, detail="token 已过期")
except JWTError:
raise HTTPException(status_code=401, detail="")
async def is_admin_user(*args, **kwargs) -> Tuple[User, Dict]:
if settings.USE_OAUTH:
return await is_admin_user_basic(*args, **kwargs)
return await is_admin_user_oauth(*args, **kwargs)

View File

@ -1,3 +1,5 @@
from pydantic.v1 import BaseSettings
TORTOISE_ORM = {
'connections': {
'default': {
@ -32,4 +34,8 @@ TORTOISE_ORM = {
'timezone': 'Asia/Shanghai'
}
SECRET_KEY = "asdasdasd-odjfnsodfnosidnfdf-0oq2j01j0jf0i1ej0fij10fd"
class Settings(BaseSettings):
USE_OAUTH = False
SECRET_KEY = "asdasdasd-odjfnsodfnosidnfdf-0oq2j01j0jf0i1ej0fij10fd"
settings = Settings()