Sistema de versionado y auditoría de filas en PostgreSQL
Este tutorial muestra cómo implementar un sistema robusto de auditoría y versionado por fila en PostgreSQL usando tablas de versiones y triggers. Tendrás:
- Una tabla principal con los datos actuales.
- Una tabla de versiones (append-only) que guarda cada cambio con rangos de validez (valid_from, valid_to).
- Funciones y triggers que garantizan atomicidad y metadatos útiles (usuario, txid).
- Consultas para reconstruir el estado a una fecha y obtener diffs usando JSONB.
Requisitos previos
- PostgreSQL 12+ (funciones usadas funcionan en 12+; recomiende 13+ para mejoras).
- Acceso a psql o una herramienta SQL que permita ejecutar scripts.
- Conocimientos básicos de PL/pgSQL y JSONB.
Estructura del proyecto
sql/
00_schema.sql -- crea tablas y extensión necesaria
01_functions.sql -- funciones PL/pgSQL para versionado
02_triggers.sql -- definición de triggers
03_indexes.sql -- índices y optimizaciones
04_examples.sql -- ejemplos de uso y consultas temporales
00_schema.sql — esquema principal
-- 00_schema.sql
-- Crea extensiones y tablas base
-- Recomendado para funciones como txid_current()
CREATE EXTENSION IF NOT EXISTS pgcrypto; -- (opcional, si usas gen_random_uuid())
-- Tabla principal: datos actuales
CREATE TABLE IF NOT EXISTS customers (
id BIGSERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
data JSONB DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- Tabla de versiones: guarda cada versión de una fila
CREATE TABLE IF NOT EXISTS customer_versions (
version_id BIGSERIAL PRIMARY KEY,
customer_id BIGINT NOT NULL,
operation CHAR(1) NOT NULL, -- 'I' = insert, 'U' = update, 'D' = delete
valid_from TIMESTAMPTZ NOT NULL,
valid_to TIMESTAMPTZ, -- NULL significa todavía vigente
data JSONB NOT NULL,
user_name TEXT,
txid BIGINT,
statement TEXT -- opcional: la sentencia (puede ser grande)
);
-- FK opcional (no obligatorio porque es historia append-only)
-- ALTER TABLE customer_versions ADD CONSTRAINT fk_customer FOREIGN KEY(customer_id) REFERENCES customers(id) ON DELETE NO ACTION;
01_functions.sql — función de versionado y ayudas
-- 01_functions.sql
CREATE OR REPLACE FUNCTION current_txid() RETURNS BIGINT LANGUAGE SQL IMMUTABLE AS $$
SELECT txid_current();
$$;
-- Función que registra versiones para INSERT/UPDATE/DELETE
CREATE OR REPLACE FUNCTION trg_customer_version() RETURNS TRIGGER LANGUAGE plpgsql AS $$
DECLARE
v_data JSONB;
v_old JSONB;
v_user TEXT := current_user;
v_txid BIGINT := txid_current();
BEGIN
IF (TG_OP = 'INSERT') THEN
v_data := to_jsonb(NEW) - 'created_at' - 'updated_at';
INSERT INTO customer_versions(
customer_id, operation, valid_from, valid_to, data, user_name, txid, statement
) VALUES (
NEW.id, 'I', now(), NULL, v_data, v_user, v_txid, NULL
);
RETURN NEW;
ELSIF (TG_OP = 'UPDATE') THEN
v_old := to_jsonb(OLD) - 'created_at' - 'updated_at';
v_data := to_jsonb(NEW) - 'created_at' - 'updated_at';
-- Cerrar la versión anterior (la que estaba vigente)
UPDATE customer_versions
SET valid_to = now()
WHERE customer_id = OLD.id AND valid_to IS NULL;
-- Insertar la nueva versión vigente
INSERT INTO customer_versions(
customer_id, operation, valid_from, valid_to, data, user_name, txid, statement
) VALUES (
NEW.id, 'U', now(), NULL, v_data, v_user, v_txid, NULL
);
RETURN NEW;
ELSIF (TG_OP = 'DELETE') THEN
v_old := to_jsonb(OLD) - 'created_at' - 'updated_at';
-- Cerrar la versión vigente
UPDATE customer_versions
SET valid_to = now()
WHERE customer_id = OLD.id AND valid_to IS NULL;
-- Registrar la eliminación como una versión final
INSERT INTO customer_versions(
customer_id, operation, valid_from, valid_to, data, user_name, txid, statement
) VALUES (
OLD.id, 'D', now(), now(), v_old, v_user, v_txid, NULL
);
RETURN OLD;
END IF;
RETURN NULL; -- no debería llegar aquí
END;
$$;
-- Función utilitaria: reconstruir el estado de un customer en un punto en el tiempo
CREATE OR REPLACE FUNCTION customer_at(p_customer_id BIGINT, p_when TIMESTAMPTZ)
RETURNS JSONB LANGUAGE SQL AS $$
SELECT data
FROM customer_versions
WHERE customer_id = p_customer_id
AND valid_from <= p_when
AND (valid_to IS NULL OR valid_to > p_when)
ORDER BY valid_from DESC
LIMIT 1;
$$;
02_triggers.sql — triggers de la tabla customers
-- 02_triggers.sql
-- Trigger AFTER para INSERT/UPDATE/DELETE
DROP TRIGGER IF EXISTS customer_version_trg ON customers;
CREATE TRIGGER customer_version_trg
AFTER INSERT OR UPDATE OR DELETE ON customers
FOR EACH ROW EXECUTE FUNCTION trg_customer_version();
-- Trigger para mantener updated_at
CREATE OR REPLACE FUNCTION set_updated_at() RETURNS TRIGGER LANGUAGE plpgsql AS $$
BEGIN
IF TG_OP = 'UPDATE' THEN
NEW.updated_at := now();
RETURN NEW;
ELSIF TG_OP = 'INSERT' THEN
NEW.created_at := COALESCE(NEW.created_at, now());
NEW.updated_at := COALESCE(NEW.updated_at, now());
RETURN NEW;
END IF;
RETURN NULL;
END;
$$;
DROP TRIGGER IF EXISTS customers_set_updated_at ON customers;
CREATE TRIGGER customers_set_updated_at
BEFORE INSERT OR UPDATE ON customers
FOR EACH ROW EXECUTE FUNCTION set_updated_at();
03_indexes.sql — índices recomendados
-- 03_indexes.sql
-- Índice para búsquedas por customer_id y rango temporal
CREATE INDEX IF NOT EXISTS idx_customer_versions_customer_time
ON customer_versions(customer_id, valid_from DESC);
-- Índice GIN para consultas dentro de JSONB
CREATE INDEX IF NOT EXISTS idx_customer_versions_data_gin
ON customer_versions USING GIN (data jsonb_path_ops);
-- Índice por txid si planeas analizar transacciones
CREATE INDEX IF NOT EXISTS idx_customer_versions_txid ON customer_versions(txid);
04_examples.sql — ejemplos de uso y consultas útiles
-- 04_examples.sql
-- Insertar clientes
INSERT INTO customers(name, email, data) VALUES ('Alice', 'alice@example.com', '{"tier": "gold"}');
INSERT INTO customers(name, email, data) VALUES ('Bob', 'bob@example.com', '{"tier": "silver"}');
-- Actualizar cliente
UPDATE customers SET data = jsonb_set(data, '{tier}', '"platinum"') WHERE email = 'alice@example.com';
-- Borrar cliente
DELETE FROM customers WHERE email = 'bob@example.com';
-- Ver versiones de Alice
SELECT * FROM customer_versions WHERE customer_id = 1 ORDER BY valid_from;
-- Reconstruir estado de Alice en el pasado
SELECT customer_at(1, now() - interval '1 hour');
-- Mostrar diffs simples entre versiones (ejemplo):
WITH v AS (
SELECT version_id, customer_id, data, valid_from
FROM customer_versions
WHERE customer_id = 1
ORDER BY valid_from DESC
LIMIT 2
)
SELECT
(SELECT data FROM v ORDER BY valid_from DESC LIMIT 1) AS newest,
(SELECT data FROM v ORDER BY valid_from ASC LIMIT 1) AS previous;
-- Consultar historial completo con metadatos
SELECT version_id, operation, valid_from, valid_to, user_name, txid, data
FROM customer_versions
WHERE customer_id = 1
ORDER BY valid_from DESC;
Por qué este diseño
- Separar la tabla de versiones (append-only) mantiene el histórico íntegro y simplifica auditoría y replicación.
- Guardar valid_from/valid_to permite consultas temporales (reconstruir estado a una fecha) sin depender de WAL/point-in-time.
- Capturar txid y user_name ayuda a correlacionar cambios con transacciones y usuarios para auditoría forense.
- Usar JSONB para el payload es flexible: puedes versionar estructuras complejas sin alterar esquema; además JSONB permite consultas y diffs relativamente eficientes.
Consideraciones operativas y rendimiento
- Si tu carga de escritura es grande, customer_versions crecerá rápido. Considera particionar por tiempo (POR RANGE(valid_from)) o por customer_id.
- Índices sobre (customer_id, valid_from) son vitales para consultas a punto en el tiempo.
- Si necesitas recuperar frecuentemente el conjunto «actual», mantener una tabla customers con la fila vigente (como aquí) es más rápido que consultar versiones cada vez.
- Revisa políticas de retención: en algunos entornos debes anonimizar o eliminar historiales por normativas (GDPR). No elimines versiones sin un proceso controlado.
Explicación técnica breve: por qué usar triggers AFTER y cerrar versiones con UPDATE
Triggers AFTER aseguran que la fila principal ya exista (o haya sido eliminada) y permiten usar datos finales de la fila. Al cerrar la versión previa con valid_to = now() y luego crear la nueva versión con valid_to NULL, garantizas una ventana temporal clara sin solapamiento (siempre que las actualizaciones sean secuenciales dentro de la misma transacción). Insertar la nueva versión después de cerrar la anterior mantiene invariantes fáciles de consultar (solo una con valid_to IS NULL por customer_id).
Limitaciones y casos extremos
- Operaciones masivas (bulk import) pueden causar mucha contención en customer_versions; usa COPY a tablas temporales y luego procesos por lotes para insertar versiones si es necesario.
- Si necesitas el SQL de la sentencia exacta, podrías usar session variables o logStatement, pero puede afectar rendimiento y volumen de datos.
- En replicación lógica, ten cuidado con triggers que escriben en tablas que también se replican; valida tu flujo de replicación.
Siguientes pasos (avanzado)
Si quieres evolucionar esto: implementa particionamiento por rango temporal en customer_versions, añade un mecanismo de compresión/archivado para versiones viejas, o integra row-level security para que solo ciertos roles puedan ver versiones. También puedes construir una API que exponga endpoints para consultar estados históricos y diffs con paginación eficiente.
Advertencia de seguridad: trata los datos históricos con la misma sensibilidad que los datos actuales. Decide una política de retención y anonimización antes de almacenar datos personales en tablas históricas, y controla el acceso a customer_versions mediante roles y permisos.
Un consejo avanzado: si necesitas reconstrucción rápida del estado de millones de filas a un punto en el tiempo, considera snapshots periódicos (materialized views o tablas de checkpoint) y luego aplicar las versiones intermedias sólo desde el checkpoint más cercano en adelante.
¿Quieres comentar?
Inicia sesión con Telegram para participar en la conversación