Auditoría de tablas en PostgreSQL con triggers, particionado y retención
En este tutorial práctico crearás un sistema de auditoría a nivel de base de datos usando triggers en PostgreSQL. Cubriremos:
- Diseño de una tabla de auditoría eficiente (jsonb + metadatos).
- Triggers y funciones PL/pgSQL que registran INSERT/UPDATE/DELETE.
- Particionado por tiempo para escalabilidad.
- Estrategia de retención y archivo de particiones antiguas.
- Demo con Node.js para generar cambios y verificar el log.
Requisitos previos
- PostgreSQL 12+ (se usan particiones declarativas; pg_cron opcional).
- psql o pgAdmin para ejecutar SQL.
- Node.js 14+ y npm (solo para demo opcional).
Estructura de carpetas
audit-project/
├─ sql/
│ ├─ 01_create_schema.sql
│ ├─ 02_audit_function_and_triggers.sql
│ ├─ 03_create_partitions.sql
│ └─ 04_retention_and_archive.sql
├─ demo/
│ ├─ package.json
│ └─ demo_app.js
└─ README.md
1) Crear esquema y tabla de ejemplo
Archivo: sql/01_create_schema.sql
-- Crear esquema y tabla de ejemplo
CREATE SCHEMA IF NOT EXISTS app;
CREATE TABLE IF NOT EXISTS app.orders (
id BIGSERIAL PRIMARY KEY,
customer_id INT NOT NULL,
amount NUMERIC(12,2) NOT NULL,
status TEXT NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT now(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT now()
);
Por qué: mantener las tablas de aplicación normalizadas y con timestamps te permite auditar cuándo sucedieron los cambios. El trigger de auditoría recogerá datos del BEFORE/AFTER.
2) Tabla de auditoría y función trigger
Archivo: sql/02_audit_function_and_triggers.sql
-- Tabla de auditoría (declarativa para particionar después por rango de tiempo)
CREATE SCHEMA IF NOT EXISTS audit;
CREATE TABLE IF NOT EXISTS audit.table_audit (
audit_id BIGSERIAL PRIMARY KEY,
table_name TEXT NOT NULL,
row_id TEXT, -- id de la fila auditada (string para flexibilidad)
operation CHAR(1) NOT NULL, -- I/U/D
changed_by TEXT, -- usuario o sistema
changed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
row_data JSONB, -- snapshot de la fila (NEW o OLD)
changed_columns TEXT[] -- columnas afectadas en UPDATE
) PARTITION BY RANGE (changed_at);
-- Función genérica de auditoría
CREATE OR REPLACE FUNCTION audit.fn_table_audit() RETURNS trigger LANGUAGE plpgsql AS $$
DECLARE
v_row JSONB;
v_op CHAR(1);
v_row_id TEXT;
v_changed_cols TEXT[] := NULL;
BEGIN
IF tg_op = 'DELETE' THEN
v_op := 'D';
v_row := to_jsonb(OLD);
v_row_id := COALESCE(OLD.id::text, NULL);
ELSIF tg_op = 'UPDATE' THEN
v_op := 'U';
v_row := to_jsonb(NEW);
v_row_id := COALESCE(NEW.id::text, NULL);
-- detectar columnas cambiadas (opcional y costoso en tablas muy grandes)
SELECT array_agg(col) INTO v_changed_cols
FROM (SELECT a.attname AS col
FROM pg_attribute a
WHERE a.attrelid = TG_RELID
AND a.attnum > 0
AND NOT a.attisdropped) cols
WHERE (to_jsonb(OLD) ->> cols.col) IS DISTINCT FROM (to_jsonb(NEW) ->> cols.col);
ELSIF tg_op = 'INSERT' THEN
v_op := 'I';
v_row := to_jsonb(NEW);
v_row_id := COALESCE(NEW.id::text, NULL);
END IF;
-- Inserción en la tabla de auditoría
INSERT INTO audit.table_audit(table_name, row_id, operation, changed_by, changed_at, row_data, changed_columns)
VALUES (TG_TABLE_NAME, v_row_id, v_op, current_user, now(), v_row, v_changed_cols);
-- Si es trigger BEFORE, devolver NEW u OLD según corresponda
IF tg_op = 'DELETE' THEN
RETURN OLD;
ELSE
RETURN NEW;
END IF;
END;
$$;
-- Crear trigger en la tabla de ejemplo
DROP TRIGGER IF EXISTS trg_audit_orders ON app.orders;
CREATE TRIGGER trg_audit_orders
AFTER INSERT OR UPDATE OR DELETE ON app.orders
FOR EACH ROW EXECUTE FUNCTION audit.fn_table_audit();
Por qué:
- Usamos JSONB para almacenar el snapshot completo: flexible, indexable y evita tener que modificar la tabla de auditoría cada vez que cambie el esquema.
- Guardamos changed_columns para consultas rápidas de qué campos cambiaron (opcional; tiene coste).
- Trigger AFTER porque queremos registrar el resultado final y evitar interferir con la transacción (aunque todavía forma parte de la misma transacción).
3) Particionado por tiempo
Archivo: sql/03_create_partitions.sql
-- Crear particiones por mes; puedes crear un script que genere particiones automáticamente.
-- Ejemplo: crear partición para 2026-02
CREATE TABLE IF NOT EXISTS audit.table_audit_2026_02 PARTITION OF audit.table_audit
FOR VALUES FROM ('2026-02-01 00:00:00+00') TO ('2026-03-01 00:00:00+00');
-- Índices en partición (recomiendo índices locales por partición)
CREATE INDEX IF NOT EXISTS idx_audit_table_name_2026_02 ON audit.table_audit_2026_02 (table_name);
CREATE INDEX IF NOT EXISTS idx_audit_changed_at_2026_02 ON audit.table_audit_2026_02 (changed_at DESC);
-- Script SQL para crear la partición mensual (ejecutar desde cron o procedimiento)
CREATE OR REPLACE FUNCTION audit.create_month_partition(p_year INT, p_month INT) RETURNS VOID LANGUAGE plpgsql AS $$
DECLARE
start_ts TIMESTAMPTZ := make_timestamptz(p_year, p_month, 1, 0, 0, 0);
end_ts TIMESTAMPTZ := (start_ts + INTERVAL '1 month');
part_name TEXT := format('table_audit_%s_%s', p_year, lpad(p_month::text, 2, '0'));
create_sql TEXT;
BEGIN
create_sql := format('CREATE TABLE IF NOT EXISTS audit.%I PARTITION OF audit.table_audit FOR VALUES FROM (%L) TO (%L);',
part_name, start_ts::text, end_ts::text);
EXECUTE create_sql;
-- crear índices locales mínimos
EXECUTE format('CREATE INDEX IF NOT EXISTS idx_%s_table_name ON audit.%I (table_name);', part_name, part_name);
EXECUTE format('CREATE INDEX IF NOT EXISTS idx_%s_changed_at ON audit.%I (changed_at DESC);', part_name, part_name);
END;
$$;
Por qué:
- Particionar por fecha mantiene cada partición compacta y permite borrar/archivar particiones completas muy rápido (DROP / ATTACH / DETACH).
- Índices locales evitan que un índice global crezca indefinidamente y permiten operaciones paralelas por partición.
4) Retención y archivo de particiones
Archivo: sql/04_retention_and_archive.sql
-- Función que elimina o archiva particiones antiguas (ejemplo: retener 6 meses)
CREATE OR REPLACE FUNCTION audit.retain_partitions(p_retain_months INT) RETURNS VOID LANGUAGE plpgsql AS $$
DECLARE
cutoff TIMESTAMPTZ := (date_trunc('month', now()) - (p_retain_months || ' months')::interval);
r RECORD;
part_name TEXT;
BEGIN
FOR r IN
SELECT inhrelid::regclass::text AS part
FROM pg_inherits
WHERE inhparent = 'audit.table_audit'::regclass
LOOP
part_name := r.part;
-- Extraer lower bound de la partición (simple heurística: nombre)
-- Aquí usamos naming convention table_audit_YYYY_MM
IF part_name ~ 'table_audit_([0-9]{4})_([0-9]{2})$' THEN
PERFORM
(CASE WHEN to_date(regexp_replace(part_name, '.*_(\d{4})_(\d{2})$', '\1-\2-01'), 'YYYY-MM-DD')::timestamptz < cutoff
THEN
-- Detach y mover a esquema archive para su export
EXECUTE format('ALTER TABLE %s SET SCHEMA audit_archive;', part_name)
ELSE NULL END);
END IF;
END LOOP;
END;
$$;
-- Nota: audit_archive esquema debe existir y puede almacenarse en otra DB o exportarse a CSV con pg_dump/ COPY.
Por qué:
- Detaching + mover a otro esquema es rápido. Después puedes exportar con COPY TO STDOUT o pg_dump y eliminar la partición si lo deseas.
- Evita borrar fila a fila y reduce bloat; las operaciones por partición son mucho más eficientes.
5) Demo: generar cambios desde Node.js
Archivos: demo/package.json y demo/demo_app.js
{
"name": "audit-demo",
"version": "1.0.0",
"dependencies": {
"pg": "^8.0.0"
}
}
// demo/demo_app.js
const { Client } = require('pg');
(async () => {
const client = new Client({
host: 'localhost',
port: 5432,
database: 'your_db',
user: 'your_user',
password: 'your_password'
});
await client.connect();
// Insert
const res1 = await client.query(
`INSERT INTO app.orders (customer_id, amount, status) VALUES ($1, $2, $3) RETURNING id`,
[42, 199.99, 'pending']
);
const id = res1.rows[0].id;
console.log('Inserted order id', id);
// Update
await client.query(`UPDATE app.orders SET status = $1, amount = $2, updated_at = now() WHERE id = $3`, ['paid', 199.99, id]);
console.log('Updated order', id);
// Delete
await client.query(`DELETE FROM app.orders WHERE id = $1`, [id]);
console.log('Deleted order', id);
// Consultar auditoría
const audit = await client.query(`SELECT audit_id, table_name, operation, changed_at, row_data FROM audit.table_audit ORDER BY changed_at DESC LIMIT 10`);
console.log('Recent audit rows:', audit.rows);
await client.end();
})();
Por qué:
- La demo ilustra el flujo completo: acciones en la tabla de aplicación y registro en la tabla de auditoría.
- En producción, usa roles dedicados y un pool de conexiones.
Consideraciones de diseño y rendimiento
- Triggers síncronos: la inserción en la tabla de auditoría forma parte de la misma transacción. Es seguro (consistencia), pero añade latencia. Para cargas elevadas considera un enfoque asíncrono (logical decoding / streaming, o escribir en una cola lightweight desde un trigger NOTIFY y consumirlo fuera de la transacción).
- JSONB vs columnas: JSONB da flexibilidad para esquemas cambiantes. Si necesitas consultas frecuentes por campos específicos, crea índices GIN/BTREE sobre (row_data ->> 'campo') o almacena columnas críticas desnormalizadas en la tabla_audit.
- Bloat y VACUUM: las particiones ayudan a mantener tablas más pequeñas. Aun así, monitoriza autovacuum y planifica vacuum full/rewrites si hay churn intenso.
- Seguridad y privacidad: no registres datos sensibles sin cifrado o enmascarado. Controla acceso al esquema audit.* y a dumps de particiones archivadas.
Consultas útiles
-- Obtener cambios de una fila en orden cronológico
SELECT changed_at, operation, changed_by, row_data
FROM audit.table_audit
WHERE table_name = 'orders' AND row_id = '123'
ORDER BY changed_at DESC;
-- Buscar cuando una columna específica cambió (si guardamos changed_columns)
SELECT * FROM audit.table_audit
WHERE table_name = 'orders' AND 'status' = ANY(changed_columns)
ORDER BY changed_at DESC LIMIT 50;
Por qué: estas consultas te permiten reconstruir la historia de una entidad y atender requisitos regulatorios o de debugging.
Próximo paso avanzado: si necesitas una solución de auditoría a escala con baja latencia, considera replicación lógica (pg_logical o plugins como wal2json) para capturar cambios fuera de la transacción y procesarlos con consumidores asíncronos. Y recuerda: nunca almacenes datos sensibles sin una política clara de cifrado y acceso.
¿Quieres comentar?
Inicia sesión con Telegram para participar en la conversación