Proyecto práctico: Auditoría de cambios en PostgreSQL (triggers, particionamiento y consultas eficientes)
Este tutorial te guía para implementar un sistema de auditoría (audit log) en PostgreSQL usando triggers en PL/pgSQL, particionamiento por fecha, y consultas para recuperar y reconstruir estados históricos. Está pensado para entornos donde necesitas trazabilidad de quien cambió qué y cuándo, con buena performance y políticas de retención.
Requisitos previos
- PostgreSQL 12+ (idealmente 13+). Algunos fragmentos usan funciones como
txid_current(). - Acceso psql o cliente SQL con permisos para crear esquemas, funciones y triggers.
- Conocimientos básicos de JSONB en PostgreSQL.
- Opcional:
pg_crono un sistema de cron para automatizar limpieza de particiones.
Estructura de carpetas del proyecto
project-audit-postgres/
├─ sql/
│ ├─ 01_init.sql -- esquema base y tabla de ejemplo
│ ├─ 02_audit_triggers.sql -- tablas de auditoría, función trigger y triggers
│ ├─ 03_queries.sql -- funciones para consultar y reconstruir filas
│ └─ 04_retention.sql -- particionamiento y job de retención
└─ README.md
1) Crear esquema y tabla de ejemplo (sql/01_init.sql)
-- sql/01_init.sql
CREATE SCHEMA IF NOT EXISTS app;
CREATE SCHEMA IF NOT EXISTS audit;
-- Tabla de ejemplo: customers
CREATE TABLE IF NOT EXISTS app.customers (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE,
meta JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- Trigger para mantener updated_at automatizado
CREATE OR REPLACE FUNCTION app.set_updated_at()
RETURNS TRIGGER LANGUAGE plpgsql AS $$
BEGIN
NEW.updated_at := now();
RETURN NEW;
END;
$$;
DROP TRIGGER IF EXISTS trg_set_updated_at ON app.customers;
CREATE TRIGGER trg_set_updated_at
BEFORE UPDATE ON app.customers
FOR EACH ROW EXECUTE FUNCTION app.set_updated_at();
-- Datos de ejemplo
INSERT INTO app.customers (name, email, meta) VALUES
('Alice', 'alice@example.com', '{"tier": "gold"}'),
('Bob', 'bob@example.com', '{"tier": "silver"}');
Por qué: separar esquemas (app y audit) ayuda a controlar permisos y a orientar el mantenimiento. La tabla de ejemplo facilita probar el sistema.
2) Tablas de auditoría, particionamiento y triggers (sql/02_audit_triggers.sql)
-- sql/02_audit_triggers.sql
-- Tabla de auditoría particionada por fecha
CREATE TABLE IF NOT EXISTS audit.row_audit (
id BIGSERIAL PRIMARY KEY,
table_name TEXT NOT NULL,
record_id JSONB NOT NULL, -- permite claves simples o compuestas
operation CHAR(1) NOT NULL, -- 'I' | 'U' | 'D'
changed_by TEXT, -- aplicación/usuario
changed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
changed_data JSONB NOT NULL, -- estructura con 'old' y 'new' o solo cambiadas
changed_columns TEXT[],
transaction_id BIGINT
) PARTITION BY RANGE (changed_at);
-- función para crear particiones diarias (si no existen)
CREATE OR REPLACE FUNCTION audit.create_partition_for_day(p_day DATE)
RETURNS TEXT LANGUAGE plpgsql AS $$
DECLARE
partition_name TEXT := format('audit.row_audit_%s', to_char(p_day, 'YYYYMMDD'));
create_sql TEXT;
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_class WHERE relname = partition_name
) THEN
create_sql := format(
'CREATE TABLE IF NOT EXISTS %I PARTITION OF audit.row_audit FOR VALUES FROM (''%s'') TO (''%s'')',
partition_name, p_day::text, (p_day + 1)::text
);
EXECUTE create_sql;
-- index useful on record_id -> value extraction depending on PK structure
EXECUTE format('CREATE INDEX ON %I ((record_id ->> ''id''))', partition_name);
END IF;
RETURN partition_name;
END;
$$;
-- trigger function genérica para tablas que queramos auditar
CREATE OR REPLACE FUNCTION audit.row_audit_trigger()
RETURNS TRIGGER LANGUAGE plpgsql AS $$
DECLARE
rec_old JSONB;
rec_new JSONB;
cols TEXT[] := ARRAY[]::TEXT[];
key TEXT;
val_old TEXT;
val_new TEXT;
p_name TEXT;
rec_id JSONB;
BEGIN
-- convertir filas a jsonb para comparar
IF TG_OP = 'DELETE' THEN
rec_old := to_jsonb(OLD);
rec_new := '{}'::jsonb;
-- asume PK 'id' si existe; para tablas con PK compuesta ajustar
IF (OLD.*) IS NOT NULL AND OLD.id IS NOT NULL THEN
rec_id := jsonb_build_object('id', OLD.id);
ELSE
rec_id := jsonb_build_object();
END IF;
cols := (SELECT array_agg(k) FROM jsonb_each_text(rec_old) AS t(k,v));
INSERT INTO audit.row_audit(table_name, record_id, operation, changed_by, changed_at, changed_data, changed_columns, transaction_id)
VALUES (TG_TABLE_NAME, rec_id, 'D', current_setting('audit.user', true), now(), jsonb_build_object('old', rec_old), cols, txid_current());
RETURN OLD;
ELSIF TG_OP = 'INSERT' THEN
rec_old := '{}'::jsonb;
rec_new := to_jsonb(NEW);
IF NEW.id IS NOT NULL THEN
rec_id := jsonb_build_object('id', NEW.id);
ELSE
rec_id := jsonb_build_object();
END IF;
cols := (SELECT array_agg(k) FROM jsonb_each_text(rec_new) AS t(k,v));
INSERT INTO audit.row_audit(table_name, record_id, operation, changed_by, changed_at, changed_data, changed_columns, transaction_id)
VALUES (TG_TABLE_NAME, rec_id, 'I', current_setting('audit.user', true), now(), jsonb_build_object('new', rec_new), cols, txid_current());
RETURN NEW;
ELSE
-- UPDATE: calcular sólo columnas cambiadas
rec_old := to_jsonb(OLD);
rec_new := to_jsonb(NEW);
FOR key, val_new IN SELECT * FROM jsonb_each_text(rec_new) LOOP
val_old := (rec_old ->> key);
IF val_old IS DISTINCT FROM val_new THEN
cols := array_append(cols, key);
END IF;
END LOOP;
IF array_length(cols,1) IS NULL THEN
-- nada cambiado (por ejemplo trigger que actualizó updated_at manualmente), no insertar
RETURN NEW;
END IF;
IF NEW.id IS NOT NULL THEN
rec_id := jsonb_build_object('id', NEW.id);
ELSE
rec_id := jsonb_build_object();
END IF;
-- guardar solo las columnas cambiadas en old/new para reducir tamaño
INSERT INTO audit.row_audit(table_name, record_id, operation, changed_by, changed_at, changed_data, changed_columns, transaction_id)
VALUES (
TG_TABLE_NAME,
rec_id,
'U',
current_setting('audit.user', true),
now(),
jsonb_build_object('old', rec_old \#> ARRAY[]::TEXT[], 'new', rec_new) -- placeholder: we'll store full old/new for simplicity
, cols,
txid_current()
);
RETURN NEW;
END IF;
END;
$$;
-- Nota: en la inserción anterior usamos un placeholder para 'old'/'new'. Para uso en producción adapta para extraer sólo claves en cols.
-- Crear trigger en la tabla de ejemplo
DROP TRIGGER IF EXISTS trg_audit_customers ON app.customers;
CREATE TRIGGER trg_audit_customers
AFTER INSERT OR UPDATE OR DELETE ON app.customers
FOR EACH ROW EXECUTE FUNCTION audit.row_audit_trigger();
-- Crear la partición para hoy
SELECT audit.create_partition_for_day(current_date);
Por qué:
- Usar JSONB para
record_idychanged_datanos da flexibilidad frente a PK simples o compuestas y evita múltiples columnas rígidas. - Particionar por fecha (changed_at) mejora scans y facilita borrar rangos antiguos sin bloquear la tabla principal.
- Guardar
changed_columnspermite consultas rápidas sobre qué campos cambiaron. - El trigger filtra UPDATEs sin cambios para evitar ruido y sobrecarga.
3) Consultas y reconstrucción de estado (sql/03_queries.sql)
-- sql/03_queries.sql
-- Obtener historial para un registro
CREATE OR REPLACE FUNCTION audit.get_history(p_table TEXT, p_record_id JSONB)
RETURNS SETOF audit.row_audit LANGUAGE sql AS $$
SELECT * FROM audit.row_audit
WHERE table_name = p_table AND record_id = p_record_id
ORDER BY changed_at;
$$;
-- Reconstruir estado de una fila hasta un instante dado
CREATE OR REPLACE FUNCTION audit.reconstruct_row(p_table TEXT, p_record_id JSONB, p_at TIMESTAMPTZ)
RETURNS JSONB LANGUAGE plpgsql AS $$
DECLARE
rec JSONB := '{}';
r RECORD;
BEGIN
FOR r IN
SELECT * FROM audit.row_audit
WHERE table_name = p_table AND record_id = p_record_id AND changed_at <= p_at
ORDER BY changed_at
LOOP
IF r.operation = 'I' THEN
-- si el insert guarda 'new' en changed_data
IF (r.changed_data ? 'new') THEN
rec := r.changed_data -> 'new';
ELSE
rec := r.changed_data;
END IF;
ELSIF r.operation = 'U' THEN
-- preferimos si 'new' existe
IF (r.changed_data ? 'new') THEN
rec := rec || r.changed_data -> 'new';
ELSE
-- fallback: merge full new
rec := rec || r.changed_data;
END IF;
ELSIF r.operation = 'D' THEN
rec := NULL; -- eliminado
END IF;
END LOOP;
RETURN rec;
END;
$$;
-- Ejemplo de uso:
-- SELECT audit.reconstruct_row('customers', jsonb_build_object('id', 1), now() - interval '1 hour');
Por qué: la función de reconstrucción aplica eventos en orden cronológico. Esto funciona porque en los inserts/updates guardamos (o deberíamos guardar) la forma completa o parcial de 'new'. Si quieres más robustez, almacena snapshots periódicos para acelerar reconstrucciones largas.
4) Retención y mantenimiento (sql/04_retention.sql)
-- sql/04_retention.sql
-- eliminar particiones antiguas mayores a N días (por ejemplo 90)
CREATE OR REPLACE FUNCTION audit.drop_partitions_older_than(p_days INTEGER)
RETURNS VOID LANGUAGE plpgsql AS $$
DECLARE
threshold DATE := current_date - p_days;
r RECORD;
partname TEXT;
BEGIN
FOR r IN
SELECT relname FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = 'audit' AND relname LIKE 'row_audit_%'
LOOP
partname := r.relname;
-- extraer fecha desde el nombre: row_audit_YYYYMMDD
IF substring(partname FROM '\\d{8}$') IS NOT NULL THEN
IF to_date(substring(partname FROM '\\d{8}$'), 'YYYYMMDD') < threshold THEN
EXECUTE format('DROP TABLE IF EXISTS audit.%I CASCADE', partname);
END IF;
END IF;
END LOOP;
END;
$$;
-- Ejemplo para agendar: ejecutar audit.drop_partitions_older_than(90) semanalmente via cron/pg_cron.
Por qué: borrar particiones es mucho más eficiente que borrar filas. Mantén un periodo de retención acorde a tus requisitos legales y de negocio.
Notas operacionales y buenas prácticas
- Evita escribir datos sensibles en texto plano en la auditoría, o enrótalos/mascáralos: la auditoría suele tener menos restricciones de acceso.
- Controla el tamaño de las particiones: crea particiones diarias si tienes volumen medio/alto; semanales si el volumen es bajo.
- Para tablas con PK compuestas adapta la construcción de
record_iden el trigger (por ejemplo incluir cada columna PK). - Considera añadir índices en las particiones según el patrón de consulta (p. ej. index sobre (table_name, (record_id->>'id'))).
- Monitorea el impacto en latencia de escritura: triggers AFTER son menos intrusivos que BEFORE; sin embargo, escribir en la partición aún añade I/O.
Opciones avanzadas: si tienes un sistema de alta tasa de escritura, evalúa usar logical decoding (pgoutput/pg_recvlogical) o herramientas externas (Debezium, wal2json) para capturar cambios sin triggers dentro de la base de datos principal.
Seguridad: restringe SELECT/DELETE/INSERT en el esquema audit a roles de auditoría/ops; evita que aplicaciones puedan truncar o borrar logs sin autorización.
Próximo paso recomendado: adapta el trigger para almacenar únicamente los pares old/new de las columnas en changed_columns (en lugar de todo el objeto), e implementa snapshots periódicos (por ejemplo, nightly) para acelerar reconstrucciones en tablas con historial muy largo.
Consejo avanzado: para volúmenes muy altos, instrumenta un worker que lea el WAL (logical replication) y escriba en una base de datos de auditoría separada, evitando impacto en las transacciones OLTP críticas.
¿Quieres comentar?
Inicia sesión con Telegram para participar en la conversación