Files

83 lines
2.6 KiB
Python
Raw Permalink Normal View History

"""FastAPI dependencies."""
from typing import AsyncGenerator
from fastapi import Depends, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.auth import decode_token, is_token_revoked
from app.core.database import get_db as _get_db
from app.core.rbac import require_admin
from app.core.redis import get_redis
from app.models.user import User
from app.schemas.user import TokenPayload
security = HTTPBearer(auto_error=False)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""Yield async database session managed by FastAPI."""
async for session in _get_db():
yield session
async def get_current_user(
credentials: HTTPAuthorizationCredentials | None = Depends(security),
db: AsyncSession = Depends(get_db),
) -> User:
"""Get current authenticated user from JWT access token."""
if not credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
token = credentials.credentials
try:
payload = decode_token(token, expected_type="access")
token_data = TokenPayload(**payload)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid authentication credentials: {exc}",
headers={"WWW-Authenticate": "Bearer"},
) from exc
if not token_data.sub or not token_data.jti:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token payload",
headers={"WWW-Authenticate": "Bearer"},
)
revoked = await is_token_revoked(token_data.jti)
if revoked:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has been revoked",
headers={"WWW-Authenticate": "Bearer"},
)
user = await db.get(User, token_data.sub)
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive user",
)
return user
async def get_current_admin(current_user: User = Depends(get_current_user)) -> User:
"""Get current user and require admin role."""
return require_admin(current_user)