Proyecto práctico: Auditoría de cambios en PostgreSQL con triggers y API en Flask
En este tutorial vas a construir un sistema de auditoría que registra INSERT, UPDATE y DELETE en tablas críticas de PostgreSQL usando funciones y triggers. Además publicaremos una API sencilla en Flask para consultar los registros de auditoría. Ideal para trazabilidad, cumplimiento y para facilitar debugging en producción.
Requisitos previos
- PostgreSQL 12+ (local o en contenedor)
- Python 3.8+
- pip y virtualenv
- Conocimientos básicos de SQL y Python
Estructura de carpetas
audit-project/
├─ sql/
│ ├─ schema.sql
│ └─ audit_triggers.sql
├─ app.py
├─ requirements.txt
└─ .env.example
Qué vas a crear
- Tabla ejemplo
users. - Tabla de auditoría genérica
audit.log_entriesque guarda metadata, operación, datos anteriores y nuevos (JSON). - Función PL/pgSQL que inserta en
audit.log_entriesdesde triggers. - Triggers para INSERT/UPDATE/DELETE en la tabla
users. - API Flask con endpoints para consultar logs por tabla, por id de fila y por rango de fechas.
Por qué usar este enfoque
Usar triggers a nivel de base de datos garantiza que ninguna aplicación omita la auditoría (consistencia). Guardar old y new como JSON permite reconstruir cambios y mantiene flexibilidad para cambios de esquema. Separar la tabla de auditoría en un schema dedicado (ej. audit) mantiene el modelo ordenado.
SQL: esquema y triggers
Archivo: sql/schema.sql
-- sql/schema.sql
CREATE SCHEMA IF NOT EXISTS audit;
-- Tabla de ejemplo
CREATE TABLE IF NOT EXISTS public.users (
id SERIAL PRIMARY KEY,
username TEXT NOT NULL UNIQUE,
email TEXT NOT NULL UNIQUE,
active BOOLEAN NOT NULL DEFAULT true,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- Tabla de auditoría
CREATE TABLE IF NOT EXISTS audit.log_entries (
id BIGSERIAL PRIMARY KEY,
schema_name TEXT NOT NULL,
table_name TEXT NOT NULL,
row_id TEXT,
operation CHAR(1) NOT NULL, -- I/U/D
changed_by TEXT,
changed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
old_data JSONB,
new_data JSONB,
statement_text TEXT -- opcional, para query info si se desea
);
-- Índices para consultas frecuentes
CREATE INDEX IF NOT EXISTS idx_audit_table ON audit.log_entries (schema_name, table_name);
CREATE INDEX IF NOT EXISTS idx_audit_rowid ON audit.log_entries (table_name, row_id);
CREATE INDEX IF NOT EXISTS idx_audit_changed_at ON audit.log_entries (changed_at DESC);
Archivo: sql/audit_triggers.sql
-- sql/audit_triggers.sql
-- Función genérica de auditoría
CREATE OR REPLACE FUNCTION audit.log_change() RETURNS trigger LANGUAGE plpgsql AS $$
DECLARE
v_old JSONB;
v_new JSONB;
v_rowid TEXT;
BEGIN
-- Serializar old/new a JSONB si existen
IF TG_OP = 'DELETE' THEN
v_old := to_jsonb(OLD);
v_new := NULL;
v_rowid := COALESCE(OLD.id::text, NULL);
ELSIF TG_OP = 'INSERT' THEN
v_old := NULL;
v_new := to_jsonb(NEW);
v_rowid := COALESCE(NEW.id::text, NULL);
ELSE -- UPDATE
v_old := to_jsonb(OLD);
v_new := to_jsonb(NEW);
v_rowid := COALESCE(NEW.id::text, OLD.id::text);
END IF;
-- Insertar entrada de audit
INSERT INTO audit.log_entries(
schema_name, table_name, row_id, operation, changed_by, changed_at, old_data, new_data, statement_text
) VALUES (
TG_TABLE_SCHEMA,
TG_TABLE_NAME,
v_rowid,
substr(TG_OP,1,1),
current_setting('audit.current_user', true), -- valor opcional inyectado por app
now(),
v_old,
v_new,
NULL
);
IF TG_OP = 'DELETE' THEN
RETURN OLD;
ELSE
RETURN NEW;
END IF;
END;
$$;
-- Crear trigger para la tabla users
DROP TRIGGER IF EXISTS users_audit_trg ON public.users;
CREATE TRIGGER users_audit_trg
AFTER INSERT OR UPDATE OR DELETE ON public.users
FOR EACH ROW EXECUTE FUNCTION audit.log_change();
Archivo .env.example
# .env.example
DATABASE_URL=postgresql://user:password@localhost:5432/mydb
FLASK_ENV=development
API en Flask
Archivo: requirements.txt
Flask==2.2.5
psycopg2-binary==2.9.6
python-dotenv==1.0.0
Archivo principal: app.py
# app.py
import os
import json
from datetime import datetime
from flask import Flask, request, jsonify, abort
import psycopg2
from psycopg2.extras import RealDictCursor
from dotenv import load_dotenv
load_dotenv()
DATABASE_URL = os.getenv('DATABASE_URL')
if not DATABASE_URL:
raise RuntimeError('DATABASE_URL must be set')
app = Flask(__name__)
def get_conn():
return psycopg2.connect(DATABASE_URL)
@app.route('/audits', methods=['GET'])
def list_audits():
"""
Parámetros opcionales:
- table: nombre de tabla (ej. users)
- row_id: id de fila
- from: fecha ISO inicio
- to: fecha ISO fin
- limit: int
"""
table = request.args.get('table')
row_id = request.args.get('row_id')
date_from = request.args.get('from')
date_to = request.args.get('to')
limit = int(request.args.get('limit') or 100)
sql = ["SELECT id, schema_name, table_name, row_id, operation, changed_by, changed_at, old_data, new_data FROM audit.log_entries WHERE 1=1"]
params = []
if table:
sql.append("AND table_name = %s")
params.append(table)
if row_id:
sql.append("AND row_id = %s")
params.append(row_id)
if date_from:
sql.append("AND changed_at >= %s")
params.append(date_from)
if date_to:
sql.append("AND changed_at <= %s")
params.append(date_to)
sql.append("ORDER BY changed_at DESC LIMIT %s")
params.append(limit)
query = ' '.join(sql)
with get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(query, params)
rows = cur.fetchall()
# Convertir JSONB a objetos JSON serializables
for r in rows:
if r['old_data'] is not None and not isinstance(r['old_data'], (dict, list)):
r['old_data'] = json.loads(r['old_data'])
if r['new_data'] is not None and not isinstance(r['new_data'], (dict, list)):
r['new_data'] = json.loads(r['new_data'])
return jsonify(rows)
@app.route('/audits/', methods=['GET'])
def get_audit(audit_id):
with get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute('SELECT * FROM audit.log_entries WHERE id = %s', (audit_id,))
row = cur.fetchone()
if not row:
abort(404)
# convertir JSONB
if row['old_data'] is not None and not isinstance(row['old_data'], (dict, list)):
row['old_data'] = json.loads(row['old_data'])
if row['new_data'] is not None and not isinstance(row['new_data'], (dict, list)):
row['new_data'] = json.loads(row['new_data'])
return jsonify(row)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=os.getenv('FLASK_ENV') == 'development')
Cómo desplegarlo localmente
- Crear y activar virtualenv, instalar dependencias:
python -m venv venv && source venv/bin/activate && pip install -r requirements.txt - Configura
.envcon tuDATABASE_URL. - Ejecuta SQL para crear esquema y triggers:
psql $DATABASE_URL -f sql/schema.sqlpsql $DATABASE_URL -f sql/audit_triggers.sql - Arranca la app:
python app.py - Prueba: insertar un usuario y consultar API:
- Insertar:INSERT INTO public.users (username,email) VALUES ('alice','alice@example.com');- Consultar:GET http://localhost:5000/audits?table=users
Explicaciones clave (el porqué)
- Tabla
audit.log_entriesalmacenaold_dataynew_datacomo JSONB porque: soporta cambios de esquema sin migrar la tabla de auditoría y facilita comparaciones y reconstrucción. - La función
audit.log_change()se ejecuta AFTER para no interferir con la lógica de la operación; así la transacción original no cambia retornando NEW/OLD según corresponda. - Usamos
current_setting('audit.current_user', true)para permitir que la aplicación inyecte el usuario que causa el cambio (ver nota más abajo). Si la variable no existe, devuelve NULL. Esto evita depender solo de la conexión DB para el actor. - Índices en columnas (table_name, row_id, changed_at) aceleran consultas más frecuentes al API.
Inyectar el usuario que hace el cambio
Si tu aplicación usa una conexión por request (pool), puedes ejecutar antes de la operación:
SET LOCAL audit.current_user = 'app_user:123';
-- luego realizar UPDATE/INSERT/DELETE en la misma transacción
Usar SET LOCAL dentro de la transacción evita persistir información entre conexiones.
Rendimiento y consideraciones
- Triggers generan overhead. Para tablas con alta tasa de escritura considera:
- Escribir a una cola (ej. Kafka) en lugar de sincronizar en la misma transacción si la latencia es crítica.
- Limitar columnas auditoradas: serializa solo columnas relevantes o un diff en vez de todo el row si los objetos son grandes.
- Archivar logs antiguos: particiona
audit.log_entriespor fecha o mueve a almacenamiento frío periódicamente.
Seguridad y cumplimiento
- Protege acceso a la tabla
audit.log_entriescon roles y políticas (GRANT SELECT a equipos de auditoría, revoca a otros). - Si almacenas PII en
old_data/new_data, considera enmascarar o cifrar campos sensibles antes de guardar. - Auditoría puede crear datos duplicados de PII: revisa políticas de retención (GDPR).
Posibles mejoras
- Agregar endpoint que calcule un diff entre old/new y muestre solo campos cambiados.
- Endpoint para revertir una fila a un estado previo (con precauciones).
- Registro del statement SQL exacto: usar contrib modules o pg_stat_activity para capturar query text.
- Soporte multi-schema y auditoría configurables por tabla usando metadatos.
Para continuar, prueba implementar un job de particionado por mes para audit.log_entries y una función que exporte a S3 archivos comprimidos de logs antiguos. Y recuerda: si vas a guardar datos sensibles en logs, diseña una política de encriptación y retención antes de pasarlo a producción.
¿Quieres comentar?
Inicia sesión con Telegram para participar en la conversación