Autenticación segura en Python con FastAPI, SQLAlchemy y JWT (con refresh tokens)

python Autenticación segura en Python con FastAPI, SQLAlchemy y JWT (con refresh tokens)

Autenticación segura en Python con FastAPI, SQLAlchemy y JWT (con refresh tokens)

Proyecto práctico: una API mínima que implementa registro, login, renovación de tokens (refresh tokens con revocación) y endpoint protegido. Se centra en prácticas seguras: hashing de contraseñas, tokens con jti (identificador único), persistencia de refresh tokens para permitir revocación y token rotation.

Prerequisitos

  • Python 3.10+
  • Conocimientos básicos de FastAPI y SQLAlchemy
  • pip / virtualenv

Instalación rápida

python -m venv .venv
source .venv/bin/activate   # Windows: .venv\Scripts\activate
pip install fastapi uvicorn sqlalchemy passlib[bcrypt] python-jose[cryptography] pydantic

O usando requirements.txt (incluido en la estructura):

pip install -r requirements.txt

Estructura de carpetas

project/
├─ app/
│  ├─ __init__.py
│  ├─ main.py
│  ├─ settings.py
│  ├─ db.py
│  ├─ models.py
│  ├─ schemas.py
│  ├─ auth.py
│  ├─ crud.py
│  └─ dependencies.py
└─ requirements.txt

Archivos principales (código completo)

app/settings.py

from pydantic import BaseSettings

class Settings(BaseSettings):
    SECRET_KEY: str = "change-me-please"
    ALGORITHM: str = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 15
    REFRESH_TOKEN_EXPIRE_DAYS: int = 7
    DATABASE_URL: str = "sqlite:///./test.db"

settings = Settings()

app/db.py

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from .settings import settings

engine = create_engine(settings.DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

app/models.py

from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey
from sqlalchemy.sql import func
from .db import Base

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True, nullable=False)
    hashed_password = Column(String, nullable=False)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime(timezone=True), server_default=func.now())

class RefreshToken(Base):
    __tablename__ = "refresh_tokens"
    id = Column(Integer, primary_key=True, index=True)
    jti = Column(String, unique=True, index=True, nullable=False)
    user_id = Column(Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
    expires_at = Column(DateTime(timezone=True), nullable=False)
    revoked = Column(Boolean, default=False)

app/schemas.py

from pydantic import BaseModel, EmailStr
from typing import Optional

class UserCreate(BaseModel):
    email: EmailStr
    password: str

class Token(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str = "bearer"

class TokenPayload(BaseModel):
    sub: int
    exp: int
    jti: Optional[str]

class UserOut(BaseModel):
    id: int
    email: EmailStr
    is_active: bool

    class Config:
        orm_mode = True

class RefreshRequest(BaseModel):
    refresh_token: str

app/auth.py

import datetime
import uuid
from jose import jwt, JWTError
from passlib.context import CryptContext
from .settings import settings

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def get_password_hash(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain: str, hashed: str) -> bool:
    return pwd_context.verify(plain, hashed)

def create_access_token(subject: int) -> str:
    expire = datetime.datetime.utcnow() + datetime.timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
    payload = {"sub": str(subject), "exp": expire}
    return jwt.encode(payload, settings.SECRET_KEY, algorithm=settings.ALGORITHM)

def create_refresh_token(subject: int):
    expire = datetime.datetime.utcnow() + datetime.timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
    jti = str(uuid.uuid4())
    payload = {"sub": str(subject), "exp": expire, "jti": jti}
    token = jwt.encode(payload, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
    return token, jti, expire

def decode_token(token: str):
    try:
        payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
        return payload
    except JWTError:
        return None

app/crud.py

from sqlalchemy.orm import Session
from . import models, schemas, auth
import datetime

def get_user_by_email(db: Session, email: str):
    return db.query(models.User).filter(models.User.email == email).first()

def create_user(db: Session, user: schemas.UserCreate):
    hashed = auth.get_password_hash(user.password)
    db_user = models.User(email=user.email, hashed_password=hashed)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

def create_refresh_token_entry(db: Session, user_id: int, jti: str, expires_at: datetime.datetime):
    rt = models.RefreshToken(jti=jti, user_id=user_id, expires_at=expires_at)
    db.add(rt)
    db.commit()
    db.refresh(rt)
    return rt

def revoke_refresh_token(db: Session, jti: str):
    rt = db.query(models.RefreshToken).filter(models.RefreshToken.jti == jti).first()
    if rt:
        rt.revoked = True
        db.add(rt)
        db.commit()
    return rt

def is_refresh_token_valid(db: Session, jti: str, user_id: int) -> bool:
    rt = db.query(models.RefreshToken).filter(models.RefreshToken.jti == jti, models.RefreshToken.user_id == user_id).first()
    if not rt or rt.revoked:
        return False
    if rt.expires_at < datetime.datetime.utcnow():
        return False
    return True

app/dependencies.py

from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from .db import SessionLocal
from .auth import decode_token
from . import models

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
    payload = decode_token(token)
    if not payload:
        raise HTTPException(status_code=401, detail="Invalid authentication credentials")
    user_id = int(payload.get("sub"))
    user = db.query(models.User).filter(models.User.id == user_id).first()
    if not user:
        raise HTTPException(status_code=401, detail="User not found")
    return user

app/main.py

from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from . import models, schemas, crud, auth, db, dependencies, settings
from fastapi.security import OAuth2PasswordRequestForm
import datetime

app = FastAPI()

# Crear tablas (suficiente para desarrollo)
models.Base.metadata.create_all(bind=db.engine)

@app.post("/register", response_model=schemas.UserOut)
def register(user_in: schemas.UserCreate, db: Session = Depends(dependencies.get_db)):
    if crud.get_user_by_email(db, user_in.email):
        raise HTTPException(status_code=400, detail="Email already registered")
    user = crud.create_user(db, user_in)
    return user

@app.post("/login", response_model=schemas.Token)
def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(dependencies.get_db)):
    user = crud.get_user_by_email(db, form_data.username)
    if not user or not auth.verify_password(form_data.password, user.hashed_password):
        raise HTTPException(status_code=400, detail="Incorrect credentials")
    access = auth.create_access_token(user.id)
    refresh, jti, expires_at = auth.create_refresh_token(user.id)
    crud.create_refresh_token_entry(db, user.id, jti, expires_at)
    return {"access_token": access, "refresh_token": refresh, "token_type": "bearer"}

@app.post("/refresh", response_model=schemas.Token)
def refresh(req: schemas.RefreshRequest, db: Session = Depends(dependencies.get_db)):
    payload = auth.decode_token(req.refresh_token)
    if not payload:
        raise HTTPException(status_code=401, detail="Invalid refresh token")
    user_id = int(payload.get("sub"))
    jti = payload.get("jti")
    if not jti or not crud.is_refresh_token_valid(db, jti, user_id):
        raise HTTPException(status_code=401, detail="Refresh token revoked or expired")
    # revoke previous refresh token (rotation)
    crud.revoke_refresh_token(db, jti)
    # issue new pair
    access = auth.create_access_token(user_id)
    refresh, new_jti, expires_at = auth.create_refresh_token(user_id)
    crud.create_refresh_token_entry(db, user_id, new_jti, expires_at)
    return {"access_token": access, "refresh_token": refresh, "token_type": "bearer"}

@app.get("/me", response_model=schemas.UserOut)
def me(current_user: models.User = Depends(dependencies.get_current_user)):
    return current_user

@app.post("/logout")
def logout(req: schemas.RefreshRequest, db: Session = Depends(dependencies.get_db)):
    payload = auth.decode_token(req.refresh_token)
    if payload:
        jti = payload.get("jti")
        crud.revoke_refresh_token(db, jti)
    return {"msg": "logged out"}

Por qué estas decisiones (no solo cómo)

  • Hashing con bcrypt (passlib): evita almacenar contraseñas en texto plano. bcrypt es coste-adaptativo.
  • Access token corto (p. ej. 15 min): reduce la ventana de ataque si se filtra el token.
  • Refresh tokens persistidos con jti: permite revocar refresh tokens y hacer token rotation (emitir uno nuevo al refrescar y revocar el anterior), mejorando seguridad ante robo.
  • Separación de responsabilidades (auth, crud, models, schemas): facilita pruebas y mantenimiento.
  • Usar JWT firmado (python-jose) y un SECRET_KEY fuerte en producción.

Ejecutar la API

uvicorn app.main:app --reload

Ejemplos rápidos (curl)

Registrar:

curl -X POST http://127.0.0.1:8000/register -H "Content-Type: application/json" -d '{"email":"user@example.com","password":"secret"}'

Login (OAuth2 password form):

curl -X POST http://127.0.0.1:8000/login -F 'username=user@example.com' -F 'password=secret'

Refrescar token:

curl -X POST http://127.0.0.1:8000/refresh -H "Content-Type: application/json" -d '{"refresh_token":"<REFRESH_TOKEN>"}'

Endpoint protegido (/me):

curl -H "Authorization: Bearer <ACCESS_TOKEN>" http://127.0.0.1:8000/me

Prácticas de seguridad y recomendaciones

  • No usar SECRET_KEY por defecto: configuración vía variables de entorno (override en Settings).
  • Usar HTTPS en producción para proteger tokens en tránsito.
  • Para aplicaciones web, preferible almacenar refresh tokens en cookies HttpOnly y SameSite para mitigar XSS y CSRF; nunca poner tokens en localStorage sin considerar riesgos.
  • Limitar intentos de inicio de sesión (rate limiting) y monitorizar tokens usados de forma sospechosa.

Este ejemplo es una base: para producción deberías añadir tests, migraciones (Alembic), logging centralizado, límites de concurrencia y validaciones extra.

Siguiente paso sugerido: implementar token rotation con persistencia del 'previous jti' para detectar reuse de refresh tokens — si un refresh token ya usado aparece de nuevo, invalidar todas las sesiones del usuario y forzar re-login.

Comentarios
¿Quieres comentar?

Inicia sesión con Telegram para participar en la conversación


Comentarios (0)

Aún no hay comentarios. ¡Sé el primero en comentar!

Iniciar Sesión