Autenticación segura en Python con FastAPI, SQLAlchemy y JWT (con refresh tokens)
Proyecto práctico: una API mínima que implementa registro, login, renovación de tokens (refresh tokens con revocación) y endpoint protegido. Se centra en prácticas seguras: hashing de contraseñas, tokens con jti (identificador único), persistencia de refresh tokens para permitir revocación y token rotation.
Prerequisitos
- Python 3.10+
- Conocimientos básicos de FastAPI y SQLAlchemy
- pip / virtualenv
Instalación rápida
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install fastapi uvicorn sqlalchemy passlib[bcrypt] python-jose[cryptography] pydantic
O usando requirements.txt (incluido en la estructura):
pip install -r requirements.txt
Estructura de carpetas
project/
├─ app/
│ ├─ __init__.py
│ ├─ main.py
│ ├─ settings.py
│ ├─ db.py
│ ├─ models.py
│ ├─ schemas.py
│ ├─ auth.py
│ ├─ crud.py
│ └─ dependencies.py
└─ requirements.txt
Archivos principales (código completo)
app/settings.py
from pydantic import BaseSettings
class Settings(BaseSettings):
SECRET_KEY: str = "change-me-please"
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 15
REFRESH_TOKEN_EXPIRE_DAYS: int = 7
DATABASE_URL: str = "sqlite:///./test.db"
settings = Settings()
app/db.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from .settings import settings
engine = create_engine(settings.DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
app/models.py
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey
from sqlalchemy.sql import func
from .db import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
class RefreshToken(Base):
__tablename__ = "refresh_tokens"
id = Column(Integer, primary_key=True, index=True)
jti = Column(String, unique=True, index=True, nullable=False)
user_id = Column(Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
expires_at = Column(DateTime(timezone=True), nullable=False)
revoked = Column(Boolean, default=False)
app/schemas.py
from pydantic import BaseModel, EmailStr
from typing import Optional
class UserCreate(BaseModel):
email: EmailStr
password: str
class Token(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
class TokenPayload(BaseModel):
sub: int
exp: int
jti: Optional[str]
class UserOut(BaseModel):
id: int
email: EmailStr
is_active: bool
class Config:
orm_mode = True
class RefreshRequest(BaseModel):
refresh_token: str
app/auth.py
import datetime
import uuid
from jose import jwt, JWTError
from passlib.context import CryptContext
from .settings import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_access_token(subject: int) -> str:
expire = datetime.datetime.utcnow() + datetime.timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
payload = {"sub": str(subject), "exp": expire}
return jwt.encode(payload, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
def create_refresh_token(subject: int):
expire = datetime.datetime.utcnow() + datetime.timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
jti = str(uuid.uuid4())
payload = {"sub": str(subject), "exp": expire, "jti": jti}
token = jwt.encode(payload, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return token, jti, expire
def decode_token(token: str):
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
return payload
except JWTError:
return None
app/crud.py
from sqlalchemy.orm import Session
from . import models, schemas, auth
import datetime
def get_user_by_email(db: Session, email: str):
return db.query(models.User).filter(models.User.email == email).first()
def create_user(db: Session, user: schemas.UserCreate):
hashed = auth.get_password_hash(user.password)
db_user = models.User(email=user.email, hashed_password=hashed)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def create_refresh_token_entry(db: Session, user_id: int, jti: str, expires_at: datetime.datetime):
rt = models.RefreshToken(jti=jti, user_id=user_id, expires_at=expires_at)
db.add(rt)
db.commit()
db.refresh(rt)
return rt
def revoke_refresh_token(db: Session, jti: str):
rt = db.query(models.RefreshToken).filter(models.RefreshToken.jti == jti).first()
if rt:
rt.revoked = True
db.add(rt)
db.commit()
return rt
def is_refresh_token_valid(db: Session, jti: str, user_id: int) -> bool:
rt = db.query(models.RefreshToken).filter(models.RefreshToken.jti == jti, models.RefreshToken.user_id == user_id).first()
if not rt or rt.revoked:
return False
if rt.expires_at < datetime.datetime.utcnow():
return False
return True
app/dependencies.py
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from .db import SessionLocal
from .auth import decode_token
from . import models
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
payload = decode_token(token)
if not payload:
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
user_id = int(payload.get("sub"))
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user:
raise HTTPException(status_code=401, detail="User not found")
return user
app/main.py
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from . import models, schemas, crud, auth, db, dependencies, settings
from fastapi.security import OAuth2PasswordRequestForm
import datetime
app = FastAPI()
# Crear tablas (suficiente para desarrollo)
models.Base.metadata.create_all(bind=db.engine)
@app.post("/register", response_model=schemas.UserOut)
def register(user_in: schemas.UserCreate, db: Session = Depends(dependencies.get_db)):
if crud.get_user_by_email(db, user_in.email):
raise HTTPException(status_code=400, detail="Email already registered")
user = crud.create_user(db, user_in)
return user
@app.post("/login", response_model=schemas.Token)
def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(dependencies.get_db)):
user = crud.get_user_by_email(db, form_data.username)
if not user or not auth.verify_password(form_data.password, user.hashed_password):
raise HTTPException(status_code=400, detail="Incorrect credentials")
access = auth.create_access_token(user.id)
refresh, jti, expires_at = auth.create_refresh_token(user.id)
crud.create_refresh_token_entry(db, user.id, jti, expires_at)
return {"access_token": access, "refresh_token": refresh, "token_type": "bearer"}
@app.post("/refresh", response_model=schemas.Token)
def refresh(req: schemas.RefreshRequest, db: Session = Depends(dependencies.get_db)):
payload = auth.decode_token(req.refresh_token)
if not payload:
raise HTTPException(status_code=401, detail="Invalid refresh token")
user_id = int(payload.get("sub"))
jti = payload.get("jti")
if not jti or not crud.is_refresh_token_valid(db, jti, user_id):
raise HTTPException(status_code=401, detail="Refresh token revoked or expired")
# revoke previous refresh token (rotation)
crud.revoke_refresh_token(db, jti)
# issue new pair
access = auth.create_access_token(user_id)
refresh, new_jti, expires_at = auth.create_refresh_token(user_id)
crud.create_refresh_token_entry(db, user_id, new_jti, expires_at)
return {"access_token": access, "refresh_token": refresh, "token_type": "bearer"}
@app.get("/me", response_model=schemas.UserOut)
def me(current_user: models.User = Depends(dependencies.get_current_user)):
return current_user
@app.post("/logout")
def logout(req: schemas.RefreshRequest, db: Session = Depends(dependencies.get_db)):
payload = auth.decode_token(req.refresh_token)
if payload:
jti = payload.get("jti")
crud.revoke_refresh_token(db, jti)
return {"msg": "logged out"}
Por qué estas decisiones (no solo cómo)
- Hashing con bcrypt (passlib): evita almacenar contraseñas en texto plano. bcrypt es coste-adaptativo.
- Access token corto (p. ej. 15 min): reduce la ventana de ataque si se filtra el token.
- Refresh tokens persistidos con jti: permite revocar refresh tokens y hacer token rotation (emitir uno nuevo al refrescar y revocar el anterior), mejorando seguridad ante robo.
- Separación de responsabilidades (auth, crud, models, schemas): facilita pruebas y mantenimiento.
- Usar JWT firmado (python-jose) y un SECRET_KEY fuerte en producción.
Ejecutar la API
uvicorn app.main:app --reload
Ejemplos rápidos (curl)
Registrar:
curl -X POST http://127.0.0.1:8000/register -H "Content-Type: application/json" -d '{"email":"user@example.com","password":"secret"}'
Login (OAuth2 password form):
curl -X POST http://127.0.0.1:8000/login -F 'username=user@example.com' -F 'password=secret'
Refrescar token:
curl -X POST http://127.0.0.1:8000/refresh -H "Content-Type: application/json" -d '{"refresh_token":"<REFRESH_TOKEN>"}'
Endpoint protegido (/me):
curl -H "Authorization: Bearer <ACCESS_TOKEN>" http://127.0.0.1:8000/me
Prácticas de seguridad y recomendaciones
- No usar SECRET_KEY por defecto: configuración vía variables de entorno (override en Settings).
- Usar HTTPS en producción para proteger tokens en tránsito.
- Para aplicaciones web, preferible almacenar refresh tokens en cookies HttpOnly y SameSite para mitigar XSS y CSRF; nunca poner tokens en localStorage sin considerar riesgos.
- Limitar intentos de inicio de sesión (rate limiting) y monitorizar tokens usados de forma sospechosa.
Este ejemplo es una base: para producción deberías añadir tests, migraciones (Alembic), logging centralizado, límites de concurrencia y validaciones extra.
Siguiente paso sugerido: implementar token rotation con persistencia del 'previous jti' para detectar reuse de refresh tokens — si un refresh token ya usado aparece de nuevo, invalidar todas las sesiones del usuario y forzar re-login.
¿Quieres comentar?
Inicia sesión con Telegram para participar en la conversación