Auditoría y versionado de datos en PostgreSQL: proyecto práctico con triggers, historial y retención

sql Auditoría y versionado de datos en PostgreSQL: proyecto práctico con triggers, historial y retención

Auditoría y versionado de datos en PostgreSQL: proyecto práctico

En este tutorial vas a construir un sistema de auditoría y versionado para una tabla de ejemplo (productos). Usaremos PostgreSQL, triggers y funciones PL/pgSQL para capturar cada cambio (INSERT/UPDATE/DELETE) en una tabla de historial. También incluiremos políticas de retención y ejemplos de consultas de "time travel".

Requisitos previos

  • PostgreSQL 12+ (se recomienda 13+). Si vas a usar gen_random_uuid() necesitas la extensión pgcrypto o uuid-ossp.
  • Acceso a psql o cliente equivalente.
  • Opcional: pg_cron o un cron externo para tareas de retención.
  • Python 3.8+ y psycopg2 si quieres probar el script de ejemplo.

Estructura de carpetas

audit-project/
  ├─ sql/
  │   ├─ 01_schema.sql
  │   ├─ 02_functions_triggers.sql
  │   ├─ 03_retention.sql
  ├─ examples/
  │   ├─ example_app.py
  │   └─ example_queries.sql
  └─ README.md

Diseño y decisiones

  • Mantener una tabla principal con el estado actual, y una tabla de historial con todos los snapshots previos. Es simple, robusto y rápido para consultas de auditoría.
  • Usamos triggers AFTER para capturar el estado anterior (OLD) en UPDATE/DELETE; en INSERT también guardamos una entrada si se desea (opcional).
  • Registraremos operation_type, changed_by y changed_at para auditoría forense.
  • Se añaden índices en la tabla de historial sobre product_id y changed_at para consultas frecuentes y retención por rango temporal.

Código: archivos SQL principales

sql/01_schema.sql

-- 01_schema.sql
CREATE EXTENSION IF NOT EXISTS pgcrypto; -- para gen_random_uuid()

-- Tabla principal
CREATE TABLE IF NOT EXISTS products (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  name TEXT NOT NULL,
  price NUMERIC(12,2) NOT NULL CHECK (price >= 0),
  stock INTEGER NOT NULL DEFAULT 0 CHECK (stock >= 0),
  created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
  updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

-- Tabla de historial (snapshot por cambio)
CREATE TABLE IF NOT EXISTS products_history (
  history_id BIGSERIAL PRIMARY KEY,
  product_id UUID NOT NULL,
  name TEXT,
  price NUMERIC(12,2),
  stock INTEGER,
  operation_type TEXT NOT NULL,        -- 'INSERT'|'UPDATE'|'DELETE'
  changed_by TEXT,                    -- se puede recoger del contexto de sesión
  changed_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

-- Índices para consultas comunes
CREATE INDEX IF NOT EXISTS idx_products_history_product_id ON products_history(product_id);
CREATE INDEX IF NOT EXISTS idx_products_history_changed_at ON products_history(changed_at DESC);

sql/02_functions_triggers.sql

-- 02_functions_triggers.sql
-- Función para insertar en historial. Captura OLD en UPDATE/DELETE y también INSERT si quieres.
CREATE OR REPLACE FUNCTION audit_products_trigger() RETURNS TRIGGER AS $$
DECLARE
  v_user TEXT;
BEGIN
  -- Intentamos leer la variable de sesión 'audit.user' si existe; si no, usamos session_user
  v_user := current_setting('audit.user', true);
  IF v_user IS NULL THEN
    v_user := session_user;
  END IF;

  IF TG_OP = 'INSERT' THEN
    -- Opcional: registrar la inserción inicial
    INSERT INTO products_history(product_id, name, price, stock, operation_type, changed_by, changed_at)
    VALUES (NEW.id, NEW.name, NEW.price, NEW.stock, 'INSERT', v_user, now());

    NEW.created_at := coalesce(NEW.created_at, now());
    NEW.updated_at := now();
    RETURN NEW;

  ELSIF TG_OP = 'UPDATE' THEN
    -- Guardamos el snapshot previo (OLD)
    INSERT INTO products_history(product_id, name, price, stock, operation_type, changed_by, changed_at)
    VALUES (OLD.id, OLD.name, OLD.price, OLD.stock, 'UPDATE', v_user, now());

    NEW.updated_at := now();
    RETURN NEW;

  ELSIF TG_OP = 'DELETE' THEN
    -- Guardamos el estado antes de borrar
    INSERT INTO products_history(product_id, name, price, stock, operation_type, changed_by, changed_at)
    VALUES (OLD.id, OLD.name, OLD.price, OLD.stock, 'DELETE', v_user, now());

    RETURN OLD;
  END IF;

  RETURN NULL; -- no debería llegar aquí
END;
$$ LANGUAGE plpgsql;

-- Crear triggers sobre la tabla products
DROP TRIGGER IF EXISTS trg_audit_products ON products;
CREATE TRIGGER trg_audit_products
  AFTER INSERT OR UPDATE OR DELETE ON products
  FOR EACH ROW EXECUTE FUNCTION audit_products_trigger();

sql/03_retention.sql

-- 03_retention.sql
-- Función para purgar historial anterior a N días
CREATE OR REPLACE FUNCTION purge_products_history_older_than(days INTEGER) RETURNS INTEGER AS $$
DECLARE
  cutoff TIMESTAMPTZ := now() - (days || ' days')::INTERVAL;
  deleted_count INTEGER := 0;
BEGIN
  DELETE FROM products_history WHERE changed_at < cutoff RETURNING 1 INTO deleted_count;
  -- Si quieres la cuenta exacta:
  GET DIAGNOSTICS deleted_count = ROW_COUNT;
  RETURN deleted_count;
END;
$$ LANGUAGE plpgsql;

-- Ejemplo de uso manual:
-- SELECT purge_products_history_older_than(180);

-- Si tienes pg_cron instalado, puedes programarlo:
-- SELECT cron.schedule('purge_history_daily', '0 3 * * *', 'SELECT purge_products_history_older_than(365)');

Ejemplo de uso desde una app (Python)

examples/example_app.py

#!/usr/bin/env python3
# Requiere psycopg2
import psycopg2

conn = psycopg2.connect("dbname=yourdb user=youruser password=yourpass host=localhost")
cur = conn.cursor()

# Establecemos un usuario de auditoría en la sesión para que el trigger lo lea
cur.execute("SET LOCAL audit.user = %s", ("service-backend:orders",))

# Insertar un producto
cur.execute("INSERT INTO products(name, price, stock) VALUES (%s, %s, %s) RETURNING id", ("Camisa", 19.99, 50))
product_id = cur.fetchone()[0]
print('Inserted', product_id)

# Actualizar
cur.execute("UPDATE products SET price = %s, stock = stock - 1 WHERE id = %s", (17.99, product_id))

# Borrar
cur.execute("DELETE FROM products WHERE id = %s", (product_id,))

conn.commit()
cur.close()
conn.close()

Al ejecutar este script verás entradas en products_history para cada operación, con changed_by recogido del contexto de sesión.

Consultas útiles

-- 1) Todas las versiones de un producto
SELECT * FROM products_history WHERE product_id = 'UUID-DEL-PRODUCTO' ORDER BY changed_at DESC;

-- 2) Estado de un producto en una fecha (time travel):
-- Tomamos la última snapshot anterior a la fecha; si no existe, el registro actual es el válido
WITH last_hist AS (
  SELECT * FROM products_history
  WHERE product_id = 'UUID-DEL-PRODUCTO' AND changed_at <= TIMESTAMP '2026-01-01 12:00:00'
  ORDER BY changed_at DESC LIMIT 1
)
SELECT COALESCE(last_hist.product_id, p.id) AS id,
       COALESCE(last_hist.name, p.name) AS name,
       COALESCE(last_hist.price, p.price) AS price,
       COALESCE(last_hist.stock, p.stock) AS stock
FROM last_hist
RIGHT JOIN products p ON p.id = COALESCE(last_hist.product_id, p.id)
WHERE COALESCE(last_hist.product_id, p.id) = 'UUID-DEL-PRODUCTO';

-- 3) Auditoría por usuario en rango de fechas
SELECT product_id, operation_type, changed_by, changed_at FROM products_history
WHERE changed_at BETWEEN now() - INTERVAL '30 days' AND now() AND changed_by = 'service-backend:orders'
ORDER BY changed_at DESC;

Por qué esta implementación

  • Triggers por fila garantizan que no se pierde ningún cambio en el proceso de escritura (atomicidad con la transacción en curso).
  • La tabla de historial contiene un snapshot completo por operación: esto facilita reconstruir estados y cumplir requisitos de auditoría, reportes y forense.
  • Almacenar changed_by mediante una variable de sesión (SET LOCAL audit.user = ...) permite atribuir cambios a servicios/usuarios sin acoplar la lógica de aplicación al trigger.

Optimización y operaciones

  • Si tu tabla es muy activa, considera batching en la limpieza de historial o mover datos antiguos a un tablespace más barato.
  • Para reducir write-amplification puedes omitir registrar operaciones que no cambien valores realmente (comprobar OLD vs NEW en el trigger).
  • Si necesitas consultas de "estado en un punto del tiempo" muy frecuentes y con alta latencia, valora usar una estrategia de event sourcing o materialized snapshots periódicos.

Seguridad y cumplimiento

  • El historial puede contener datos personales. Aplica encriptación en reposo o column-level encryption si aplica GDPR/CCPA.
  • Si el usuario puede pedir eliminación (derecho al olvido), tendrás que coordinar la eliminación desde la tabla de historial y mantener logs de auditoría separados si la ley lo exige.
  • Controla quién puede ejecutar la función de purga y quién puede leer la tabla de historial mediante roles y grants.

Para seguir: plantea integrar este patrón con tu pipeline de backups y con el sistema de monitoreo (alertas si el historial crece inesperadamente). Un consejo avanzado: si tu carga de escrituras es masiva, evalúa capturar cambios con logical replication (pg_recvlogical) y procesarlos en un sink que haga el versionado de manera asíncrona; así reduces latencia de writes pero pierdes la atomicidad directa del trigger.

Advertencia de seguridad: nunca confíes en variables de sesión sin autenticación/validación en tu capa de aplicación; valida y firma identificadores de actores cuando sea necesario.

Comentarios
¿Quieres comentar?

Inicia sesión con Telegram para participar en la conversación


Comentarios (0)

Aún no hay comentarios. ¡Sé el primero en comentar!

Iniciar Sesión