API REST en Python con FastAPI, autenticación JWT y SQLite (proyecto práctico)
En este tutorial vas a construir una API REST completa con FastAPI que incluye:
- Registro de usuarios y login con contraseñas hasheadas
- Autenticación basada en JWT (JSON Web Tokens)
- CRUD básico de notas asociadas al usuario
- SQLite + SQLAlchemy como almacenamiento ligero
- Instrucciones para ejecutar localmente y en Docker
Requisitos previos
- Python 3.9+ (recomendado 3.10 o superior)
- pip, virtualenv (o venv)
- Conocimientos básicos de HTTP, JSON y Python
Estructura de carpetas
fastapi-jwt-sqlite/
├─ app/
│ ├─ __init__.py
│ ├─ main.py
│ ├─ database.py
│ ├─ models.py
│ ├─ schemas.py
│ ├─ crud.py
│ ├─ auth.py
│ └─ deps.py
├─ requirements.txt
├─ Dockerfile
└─ .env.example
Archivo requirements.txt
fastapi==0.95.2
uvicorn[standard]==0.22.0
SQLAlchemy==1.4.49
pydantic==1.10.7
python-jose==3.3.0
passlib[bcrypt]==1.7.4
python-dotenv==1.0.0
Notas: versiones indicadas son orientativas y estables a la fecha de este post. Ajusta según necesites.
1) database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import os
DATABASE_URL = os.getenv('DATABASE_URL', 'sqlite:///./test.db')
# connect_args para SQLite: permite multithreading en dev
engine = create_engine(
DATABASE_URL,
connect_args={"check_same_thread": False} if DATABASE_URL.startswith('sqlite') else {}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
2) models.py
from sqlalchemy import Column, Integer, String, ForeignKey, Text
from sqlalchemy.orm import relationship
from .database import Base
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
notes = relationship('Note', back_populates='owner')
class Note(Base):
__tablename__ = 'notes'
id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
content = Column(Text)
owner_id = Column(Integer, ForeignKey('users.id'))
owner = relationship('User', back_populates='notes')
3) schemas.py (Pydantic)
from pydantic import BaseModel
from typing import List, Optional
class NoteBase(BaseModel):
title: Optional[str] = None
content: Optional[str] = None
class NoteCreate(NoteBase):
title: str
content: str
class NoteOut(NoteBase):
id: int
owner_id: int
class Config:
orm_mode = True
class UserCreate(BaseModel):
username: str
password: str
class UserOut(BaseModel):
id: int
username: str
class Config:
orm_mode = True
class Token(BaseModel):
access_token: str
token_type: str
4) auth.py (autenticación y helpers JWT)
from datetime import datetime, timedelta
from typing import Optional
import os
from passlib.context import CryptContext
from jose import JWTError, jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal
# Configuración: usa variables de entorno en producción
SECRET_KEY = os.getenv('SECRET_KEY', 'dev-secret-change-me')
ALGORITHM = 'HS256'
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=['bcrypt'], deprecated='auto')
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='token')
# Helpers de hashing
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
# JWT
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({'exp': expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# Dependency para obtener el usuario actual desde el token
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> models.User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='No se pudo validar las credenciales',
headers={'WWW-Authenticate': 'Bearer'},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get('sub')
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = crud.get_user_by_username(db, username=username)
if user is None:
raise credentials_exception
return user
5) crud.py (operaciones sobre DB)
from sqlalchemy.orm import Session
from . import models, schemas, auth
# Users
def get_user_by_username(db: Session, username: str):
return db.query(models.User).filter(models.User.username == username).first()
def create_user(db: Session, user: schemas.UserCreate):
hashed = auth.get_password_hash(user.password)
db_user = models.User(username=user.username, hashed_password=hashed)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
# Notes
def create_note_for_user(db: Session, note: schemas.NoteCreate, user_id: int):
db_note = models.Note(**note.dict(), owner_id=user_id)
db.add(db_note)
db.commit()
db.refresh(db_note)
return db_note
def get_notes_for_user(db: Session, user_id: int):
return db.query(models.Note).filter(models.Note.owner_id == user_id).all()
def get_note(db: Session, note_id: int):
return db.query(models.Note).filter(models.Note.id == note_id).first()
6) deps.py (dependencias comunes)
from .database import SessionLocal
# Ya tenemos get_db en auth.py; esto es opcional si prefieres centralizar deps
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
7) main.py (ruta principal)
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from datetime import timedelta
import os
from . import models, schemas, crud, auth
from .database import engine, Base
Base.metadata.create_all(bind=engine)
app = FastAPI(title='FastAPI JWT SQLite Example')
@app.post('/users/', response_model=schemas.UserOut)
def create_user(user: schemas.UserCreate, db: Session = Depends(auth.get_db)):
db_user = crud.get_user_by_username(db, username=user.username)
if db_user:
raise HTTPException(status_code=400, detail='Username already registered')
return crud.create_user(db, user)
@app.post('/token', response_model=schemas.Token)
def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(auth.get_db)):
user = crud.get_user_by_username(db, username=form_data.username)
if not user:
raise HTTPException(status_code=400, detail='Incorrect username or password')
if not auth.verify_password(form_data.password, user.hashed_password):
raise HTTPException(status_code=400, detail='Incorrect username or password')
access_token_expires = timedelta(minutes=int(os.getenv('ACCESS_TOKEN_EXPIRE_MINUTES', '30')))
access_token = auth.create_access_token(data={'sub': user.username}, expires_delta=access_token_expires)
return {'access_token': access_token, 'token_type': 'bearer'}
@app.post('/notes/', response_model=schemas.NoteOut)
def create_note(note: schemas.NoteCreate, current_user: models.User = Depends(auth.get_current_user), db: Session = Depends(auth.get_db)):
return crud.create_note_for_user(db=db, note=note, user_id=current_user.id)
@app.get('/notes/', response_model=list[schemas.NoteOut])
def read_own_notes(current_user: models.User = Depends(auth.get_current_user), db: Session = Depends(auth.get_db)):
return crud.get_notes_for_user(db, user_id=current_user.id)
@app.get('/notes/{note_id}', response_model=schemas.NoteOut)
def read_note(note_id: int, current_user: models.User = Depends(auth.get_current_user), db: Session = Depends(auth.get_db)):
note = crud.get_note(db, note_id)
if note is None or note.owner_id != current_user.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Note not found')
return note
8) .env.example
SECRET_KEY=put-a-long-random-string-here
DATABASE_URL=sqlite:///./test.db
ACCESS_TOKEN_EXPIRE_MINUTES=30
9) Dockerfile (opcional)
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app ./app
ENV PYTHONUNBUFFERED=1
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
Cómo ejecutar localmente
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install -r requirements.txt
export SECRET_KEY='cambia-esta-clave-en-produccion' # Windows: set SECRET_KEY=...
uvicorn app.main:app --reload
La API quedará en http://127.0.0.1:8000. Abre http://127.0.0.1:8000/docs para la documentación interactiva (Swagger UI).
Flujo de uso (ejemplo)
- POST /users/ con JSON {"username":"alice", "password":"strongpass"} → crea usuario
- POST /token con form data username=alice y password=strongpass → obtienes access_token
- Usa Authorization: Bearer <token> para crear/leer notas
Explicación de por qué estas decisiones
- FastAPI: muy productiva, validación automática con Pydantic y documentación OpenAPI por defecto.
- SQLAlchemy: ORM maduro y flexible; SQLite para un desarrollo ligero y fácil de versionar en proyectos pequeños.
- python-jose: biblioteca sencilla para firmar y verificar JWT.
- passlib + bcrypt: hashing de contraseñas seguro y probado.
- OAuth2PasswordBearer: implementa el flujo estándar para intercambio de credenciales por tokens (Resource Owner Password Credentials con token endpoint).
- Separación en módulos (crud, auth, models, schemas): facilita pruebas y mantenibilidad.
Consideraciones de seguridad y producción
- No uses SECRET_KEY por defecto en producción; guárdalo en un gestor de secretos o variables de entorno seguras.
- Activa TLS (HTTPS) en front/proxy (Nginx / Cloud provider) para proteger tokens en tránsito.
- Valida longitudes y complejidad de contraseñas en el registro.
- Piensa en revocación/blacklist de tokens si necesitas logout forzado; un JWT con expiración corta + refresh tokens es habitual.
- Evita usar SQLite en entornos con alta concurrencia; cambia a PostgreSQL para producción.
Próximo paso sugerido: añade pruebas unitarias (pytest) para las rutas y la lógica de autenticación, y configura Alembic para migraciones de esquema.
Consejo avanzado: para una arquitectura más segura y escalable, cambia a JWT firmados con clave asimétrica (RS256) y usa un servidor de autorización centralizado (Keycloak, Auth0) o implementa refresh tokens, rotación de claves y revocación con Redis.
¿Quieres comentar?
Inicia sesión con Telegram para participar en la conversación