Auditoría y versionado de datos en PostgreSQL: proyecto práctico
En este tutorial vas a construir un sistema de auditoría y versionado para una tabla de ejemplo (productos). Usaremos PostgreSQL, triggers y funciones PL/pgSQL para capturar cada cambio (INSERT/UPDATE/DELETE) en una tabla de historial. También incluiremos políticas de retención y ejemplos de consultas de "time travel".
Requisitos previos
- PostgreSQL 12+ (se recomienda 13+). Si vas a usar gen_random_uuid() necesitas la extensión
pgcryptoouuid-ossp. - Acceso a psql o cliente equivalente.
- Opcional:
pg_crono un cron externo para tareas de retención. - Python 3.8+ y
psycopg2si quieres probar el script de ejemplo.
Estructura de carpetas
audit-project/
├─ sql/
│ ├─ 01_schema.sql
│ ├─ 02_functions_triggers.sql
│ ├─ 03_retention.sql
├─ examples/
│ ├─ example_app.py
│ └─ example_queries.sql
└─ README.md
Diseño y decisiones
- Mantener una tabla principal con el estado actual, y una tabla de historial con todos los snapshots previos. Es simple, robusto y rápido para consultas de auditoría.
- Usamos triggers AFTER para capturar el estado anterior (OLD) en UPDATE/DELETE; en INSERT también guardamos una entrada si se desea (opcional).
- Registraremos
operation_type,changed_byychanged_atpara auditoría forense. - Se añaden índices en la tabla de historial sobre
product_idychanged_atpara consultas frecuentes y retención por rango temporal.
Código: archivos SQL principales
sql/01_schema.sql
-- 01_schema.sql
CREATE EXTENSION IF NOT EXISTS pgcrypto; -- para gen_random_uuid()
-- Tabla principal
CREATE TABLE IF NOT EXISTS products (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
price NUMERIC(12,2) NOT NULL CHECK (price >= 0),
stock INTEGER NOT NULL DEFAULT 0 CHECK (stock >= 0),
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- Tabla de historial (snapshot por cambio)
CREATE TABLE IF NOT EXISTS products_history (
history_id BIGSERIAL PRIMARY KEY,
product_id UUID NOT NULL,
name TEXT,
price NUMERIC(12,2),
stock INTEGER,
operation_type TEXT NOT NULL, -- 'INSERT'|'UPDATE'|'DELETE'
changed_by TEXT, -- se puede recoger del contexto de sesión
changed_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- Índices para consultas comunes
CREATE INDEX IF NOT EXISTS idx_products_history_product_id ON products_history(product_id);
CREATE INDEX IF NOT EXISTS idx_products_history_changed_at ON products_history(changed_at DESC);
sql/02_functions_triggers.sql
-- 02_functions_triggers.sql
-- Función para insertar en historial. Captura OLD en UPDATE/DELETE y también INSERT si quieres.
CREATE OR REPLACE FUNCTION audit_products_trigger() RETURNS TRIGGER AS $$
DECLARE
v_user TEXT;
BEGIN
-- Intentamos leer la variable de sesión 'audit.user' si existe; si no, usamos session_user
v_user := current_setting('audit.user', true);
IF v_user IS NULL THEN
v_user := session_user;
END IF;
IF TG_OP = 'INSERT' THEN
-- Opcional: registrar la inserción inicial
INSERT INTO products_history(product_id, name, price, stock, operation_type, changed_by, changed_at)
VALUES (NEW.id, NEW.name, NEW.price, NEW.stock, 'INSERT', v_user, now());
NEW.created_at := coalesce(NEW.created_at, now());
NEW.updated_at := now();
RETURN NEW;
ELSIF TG_OP = 'UPDATE' THEN
-- Guardamos el snapshot previo (OLD)
INSERT INTO products_history(product_id, name, price, stock, operation_type, changed_by, changed_at)
VALUES (OLD.id, OLD.name, OLD.price, OLD.stock, 'UPDATE', v_user, now());
NEW.updated_at := now();
RETURN NEW;
ELSIF TG_OP = 'DELETE' THEN
-- Guardamos el estado antes de borrar
INSERT INTO products_history(product_id, name, price, stock, operation_type, changed_by, changed_at)
VALUES (OLD.id, OLD.name, OLD.price, OLD.stock, 'DELETE', v_user, now());
RETURN OLD;
END IF;
RETURN NULL; -- no debería llegar aquí
END;
$$ LANGUAGE plpgsql;
-- Crear triggers sobre la tabla products
DROP TRIGGER IF EXISTS trg_audit_products ON products;
CREATE TRIGGER trg_audit_products
AFTER INSERT OR UPDATE OR DELETE ON products
FOR EACH ROW EXECUTE FUNCTION audit_products_trigger();
sql/03_retention.sql
-- 03_retention.sql
-- Función para purgar historial anterior a N días
CREATE OR REPLACE FUNCTION purge_products_history_older_than(days INTEGER) RETURNS INTEGER AS $$
DECLARE
cutoff TIMESTAMPTZ := now() - (days || ' days')::INTERVAL;
deleted_count INTEGER := 0;
BEGIN
DELETE FROM products_history WHERE changed_at < cutoff RETURNING 1 INTO deleted_count;
-- Si quieres la cuenta exacta:
GET DIAGNOSTICS deleted_count = ROW_COUNT;
RETURN deleted_count;
END;
$$ LANGUAGE plpgsql;
-- Ejemplo de uso manual:
-- SELECT purge_products_history_older_than(180);
-- Si tienes pg_cron instalado, puedes programarlo:
-- SELECT cron.schedule('purge_history_daily', '0 3 * * *', 'SELECT purge_products_history_older_than(365)');
Ejemplo de uso desde una app (Python)
examples/example_app.py
#!/usr/bin/env python3
# Requiere psycopg2
import psycopg2
conn = psycopg2.connect("dbname=yourdb user=youruser password=yourpass host=localhost")
cur = conn.cursor()
# Establecemos un usuario de auditoría en la sesión para que el trigger lo lea
cur.execute("SET LOCAL audit.user = %s", ("service-backend:orders",))
# Insertar un producto
cur.execute("INSERT INTO products(name, price, stock) VALUES (%s, %s, %s) RETURNING id", ("Camisa", 19.99, 50))
product_id = cur.fetchone()[0]
print('Inserted', product_id)
# Actualizar
cur.execute("UPDATE products SET price = %s, stock = stock - 1 WHERE id = %s", (17.99, product_id))
# Borrar
cur.execute("DELETE FROM products WHERE id = %s", (product_id,))
conn.commit()
cur.close()
conn.close()
Al ejecutar este script verás entradas en products_history para cada operación, con changed_by recogido del contexto de sesión.
Consultas útiles
-- 1) Todas las versiones de un producto
SELECT * FROM products_history WHERE product_id = 'UUID-DEL-PRODUCTO' ORDER BY changed_at DESC;
-- 2) Estado de un producto en una fecha (time travel):
-- Tomamos la última snapshot anterior a la fecha; si no existe, el registro actual es el válido
WITH last_hist AS (
SELECT * FROM products_history
WHERE product_id = 'UUID-DEL-PRODUCTO' AND changed_at <= TIMESTAMP '2026-01-01 12:00:00'
ORDER BY changed_at DESC LIMIT 1
)
SELECT COALESCE(last_hist.product_id, p.id) AS id,
COALESCE(last_hist.name, p.name) AS name,
COALESCE(last_hist.price, p.price) AS price,
COALESCE(last_hist.stock, p.stock) AS stock
FROM last_hist
RIGHT JOIN products p ON p.id = COALESCE(last_hist.product_id, p.id)
WHERE COALESCE(last_hist.product_id, p.id) = 'UUID-DEL-PRODUCTO';
-- 3) Auditoría por usuario en rango de fechas
SELECT product_id, operation_type, changed_by, changed_at FROM products_history
WHERE changed_at BETWEEN now() - INTERVAL '30 days' AND now() AND changed_by = 'service-backend:orders'
ORDER BY changed_at DESC;
Por qué esta implementación
- Triggers por fila garantizan que no se pierde ningún cambio en el proceso de escritura (atomicidad con la transacción en curso).
- La tabla de historial contiene un snapshot completo por operación: esto facilita reconstruir estados y cumplir requisitos de auditoría, reportes y forense.
- Almacenar
changed_bymediante una variable de sesión (SET LOCAL audit.user = ...) permite atribuir cambios a servicios/usuarios sin acoplar la lógica de aplicación al trigger.
Optimización y operaciones
- Si tu tabla es muy activa, considera batching en la limpieza de historial o mover datos antiguos a un tablespace más barato.
- Para reducir write-amplification puedes omitir registrar operaciones que no cambien valores realmente (comprobar OLD vs NEW en el trigger).
- Si necesitas consultas de "estado en un punto del tiempo" muy frecuentes y con alta latencia, valora usar una estrategia de event sourcing o materialized snapshots periódicos.
Seguridad y cumplimiento
- El historial puede contener datos personales. Aplica encriptación en reposo o column-level encryption si aplica GDPR/CCPA.
- Si el usuario puede pedir eliminación (derecho al olvido), tendrás que coordinar la eliminación desde la tabla de historial y mantener logs de auditoría separados si la ley lo exige.
- Controla quién puede ejecutar la función de purga y quién puede leer la tabla de historial mediante roles y grants.
Para seguir: plantea integrar este patrón con tu pipeline de backups y con el sistema de monitoreo (alertas si el historial crece inesperadamente). Un consejo avanzado: si tu carga de escrituras es masiva, evalúa capturar cambios con logical replication (pg_recvlogical) y procesarlos en un sink que haga el versionado de manera asíncrona; así reduces latencia de writes pero pierdes la atomicidad directa del trigger.
Advertencia de seguridad: nunca confíes en variables de sesión sin autenticación/validación en tu capa de aplicación; valida y firma identificadores de actores cuando sea necesario.
¿Quieres comentar?
Inicia sesión con Telegram para participar en la conversación