Auditoría de tablas en PostgreSQL con triggers, particionado y retención

sql Auditoría de tablas en PostgreSQL con triggers, particionado y retención

Auditoría de tablas en PostgreSQL con triggers, particionado y retención

En este tutorial práctico crearás un sistema de auditoría a nivel de base de datos usando triggers en PostgreSQL. Cubriremos:

  • Diseño de una tabla de auditoría eficiente (jsonb + metadatos).
  • Triggers y funciones PL/pgSQL que registran INSERT/UPDATE/DELETE.
  • Particionado por tiempo para escalabilidad.
  • Estrategia de retención y archivo de particiones antiguas.
  • Demo con Node.js para generar cambios y verificar el log.

Requisitos previos

  • PostgreSQL 12+ (se usan particiones declarativas; pg_cron opcional).
  • psql o pgAdmin para ejecutar SQL.
  • Node.js 14+ y npm (solo para demo opcional).

Estructura de carpetas

audit-project/
├─ sql/
│  ├─ 01_create_schema.sql
│  ├─ 02_audit_function_and_triggers.sql
│  ├─ 03_create_partitions.sql
│  └─ 04_retention_and_archive.sql
├─ demo/
│  ├─ package.json
│  └─ demo_app.js
└─ README.md

1) Crear esquema y tabla de ejemplo

Archivo: sql/01_create_schema.sql

-- Crear esquema y tabla de ejemplo
CREATE SCHEMA IF NOT EXISTS app;

CREATE TABLE IF NOT EXISTS app.orders (
  id BIGSERIAL PRIMARY KEY,
  customer_id INT NOT NULL,
  amount NUMERIC(12,2) NOT NULL,
  status TEXT NOT NULL,
  created_at TIMESTAMP WITH TIME ZONE DEFAULT now(),
  updated_at TIMESTAMP WITH TIME ZONE DEFAULT now()
);

Por qué: mantener las tablas de aplicación normalizadas y con timestamps te permite auditar cuándo sucedieron los cambios. El trigger de auditoría recogerá datos del BEFORE/AFTER.

2) Tabla de auditoría y función trigger

Archivo: sql/02_audit_function_and_triggers.sql

-- Tabla de auditoría (declarativa para particionar después por rango de tiempo)
CREATE SCHEMA IF NOT EXISTS audit;

CREATE TABLE IF NOT EXISTS audit.table_audit (
  audit_id BIGSERIAL PRIMARY KEY,
  table_name TEXT NOT NULL,
  row_id TEXT,                    -- id de la fila auditada (string para flexibilidad)
  operation CHAR(1) NOT NULL,     -- I/U/D
  changed_by TEXT,                -- usuario o sistema
  changed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
  row_data JSONB,                 -- snapshot de la fila (NEW o OLD)
  changed_columns TEXT[]          -- columnas afectadas en UPDATE
) PARTITION BY RANGE (changed_at);

-- Función genérica de auditoría
CREATE OR REPLACE FUNCTION audit.fn_table_audit() RETURNS trigger LANGUAGE plpgsql AS $$
DECLARE
  v_row JSONB;
  v_op CHAR(1);
  v_row_id TEXT;
  v_changed_cols TEXT[] := NULL;
BEGIN
  IF tg_op = 'DELETE' THEN
    v_op := 'D';
    v_row := to_jsonb(OLD);
    v_row_id := COALESCE(OLD.id::text, NULL);
  ELSIF tg_op = 'UPDATE' THEN
    v_op := 'U';
    v_row := to_jsonb(NEW);
    v_row_id := COALESCE(NEW.id::text, NULL);

    -- detectar columnas cambiadas (opcional y costoso en tablas muy grandes)
    SELECT array_agg(col) INTO v_changed_cols
    FROM (SELECT a.attname AS col
          FROM pg_attribute a
          WHERE a.attrelid = TG_RELID
            AND a.attnum > 0
            AND NOT a.attisdropped) cols
    WHERE (to_jsonb(OLD) ->> cols.col) IS DISTINCT FROM (to_jsonb(NEW) ->> cols.col);

  ELSIF tg_op = 'INSERT' THEN
    v_op := 'I';
    v_row := to_jsonb(NEW);
    v_row_id := COALESCE(NEW.id::text, NULL);
  END IF;

  -- Inserción en la tabla de auditoría
  INSERT INTO audit.table_audit(table_name, row_id, operation, changed_by, changed_at, row_data, changed_columns)
  VALUES (TG_TABLE_NAME, v_row_id, v_op, current_user, now(), v_row, v_changed_cols);

  -- Si es trigger BEFORE, devolver NEW u OLD según corresponda
  IF tg_op = 'DELETE' THEN
    RETURN OLD;
  ELSE
    RETURN NEW;
  END IF;
END;
$$;

-- Crear trigger en la tabla de ejemplo
DROP TRIGGER IF EXISTS trg_audit_orders ON app.orders;
CREATE TRIGGER trg_audit_orders
  AFTER INSERT OR UPDATE OR DELETE ON app.orders
  FOR EACH ROW EXECUTE FUNCTION audit.fn_table_audit();

Por qué:

  • Usamos JSONB para almacenar el snapshot completo: flexible, indexable y evita tener que modificar la tabla de auditoría cada vez que cambie el esquema.
  • Guardamos changed_columns para consultas rápidas de qué campos cambiaron (opcional; tiene coste).
  • Trigger AFTER porque queremos registrar el resultado final y evitar interferir con la transacción (aunque todavía forma parte de la misma transacción).

3) Particionado por tiempo

Archivo: sql/03_create_partitions.sql

-- Crear particiones por mes; puedes crear un script que genere particiones automáticamente.
-- Ejemplo: crear partición para 2026-02
CREATE TABLE IF NOT EXISTS audit.table_audit_2026_02 PARTITION OF audit.table_audit
  FOR VALUES FROM ('2026-02-01 00:00:00+00') TO ('2026-03-01 00:00:00+00');

-- Índices en partición (recomiendo índices locales por partición)
CREATE INDEX IF NOT EXISTS idx_audit_table_name_2026_02 ON audit.table_audit_2026_02 (table_name);
CREATE INDEX IF NOT EXISTS idx_audit_changed_at_2026_02 ON audit.table_audit_2026_02 (changed_at DESC);

-- Script SQL para crear la partición mensual (ejecutar desde cron o procedimiento)
CREATE OR REPLACE FUNCTION audit.create_month_partition(p_year INT, p_month INT) RETURNS VOID LANGUAGE plpgsql AS $$
DECLARE
  start_ts TIMESTAMPTZ := make_timestamptz(p_year, p_month, 1, 0, 0, 0);
  end_ts TIMESTAMPTZ := (start_ts + INTERVAL '1 month');
  part_name TEXT := format('table_audit_%s_%s', p_year, lpad(p_month::text, 2, '0'));
  create_sql TEXT;
BEGIN
  create_sql := format('CREATE TABLE IF NOT EXISTS audit.%I PARTITION OF audit.table_audit FOR VALUES FROM (%L) TO (%L);',
                       part_name, start_ts::text, end_ts::text);
  EXECUTE create_sql;
  -- crear índices locales mínimos
  EXECUTE format('CREATE INDEX IF NOT EXISTS idx_%s_table_name ON audit.%I (table_name);', part_name, part_name);
  EXECUTE format('CREATE INDEX IF NOT EXISTS idx_%s_changed_at ON audit.%I (changed_at DESC);', part_name, part_name);
END;
$$;

Por qué:

  • Particionar por fecha mantiene cada partición compacta y permite borrar/archivar particiones completas muy rápido (DROP / ATTACH / DETACH).
  • Índices locales evitan que un índice global crezca indefinidamente y permiten operaciones paralelas por partición.

4) Retención y archivo de particiones

Archivo: sql/04_retention_and_archive.sql

-- Función que elimina o archiva particiones antiguas (ejemplo: retener 6 meses)
CREATE OR REPLACE FUNCTION audit.retain_partitions(p_retain_months INT) RETURNS VOID LANGUAGE plpgsql AS $$
DECLARE
  cutoff TIMESTAMPTZ := (date_trunc('month', now()) - (p_retain_months || ' months')::interval);
  r RECORD;
  part_name TEXT;
BEGIN
  FOR r IN
    SELECT inhrelid::regclass::text AS part
    FROM pg_inherits
    WHERE inhparent = 'audit.table_audit'::regclass
  LOOP
    part_name := r.part;
    -- Extraer lower bound de la partición (simple heurística: nombre)
    -- Aquí usamos naming convention table_audit_YYYY_MM
    IF part_name ~ 'table_audit_([0-9]{4})_([0-9]{2})$' THEN
      PERFORM
        (CASE WHEN to_date(regexp_replace(part_name, '.*_(\d{4})_(\d{2})$', '\1-\2-01'), 'YYYY-MM-DD')::timestamptz < cutoff
         THEN
           -- Detach y mover a esquema archive para su export
           EXECUTE format('ALTER TABLE %s SET SCHEMA audit_archive;', part_name)
         ELSE NULL END);
    END IF;
  END LOOP;
END;
$$;

-- Nota: audit_archive esquema debe existir y puede almacenarse en otra DB o exportarse a CSV con pg_dump/ COPY.

Por qué:

  • Detaching + mover a otro esquema es rápido. Después puedes exportar con COPY TO STDOUT o pg_dump y eliminar la partición si lo deseas.
  • Evita borrar fila a fila y reduce bloat; las operaciones por partición son mucho más eficientes.

5) Demo: generar cambios desde Node.js

Archivos: demo/package.json y demo/demo_app.js

{
  "name": "audit-demo",
  "version": "1.0.0",
  "dependencies": {
    "pg": "^8.0.0"
  }
}
// demo/demo_app.js
const { Client } = require('pg');

(async () => {
  const client = new Client({
    host: 'localhost',
    port: 5432,
    database: 'your_db',
    user: 'your_user',
    password: 'your_password'
  });
  await client.connect();

  // Insert
  const res1 = await client.query(
    `INSERT INTO app.orders (customer_id, amount, status) VALUES ($1, $2, $3) RETURNING id`,
    [42, 199.99, 'pending']
  );
  const id = res1.rows[0].id;
  console.log('Inserted order id', id);

  // Update
  await client.query(`UPDATE app.orders SET status = $1, amount = $2, updated_at = now() WHERE id = $3`, ['paid', 199.99, id]);
  console.log('Updated order', id);

  // Delete
  await client.query(`DELETE FROM app.orders WHERE id = $1`, [id]);
  console.log('Deleted order', id);

  // Consultar auditoría
  const audit = await client.query(`SELECT audit_id, table_name, operation, changed_at, row_data FROM audit.table_audit ORDER BY changed_at DESC LIMIT 10`);
  console.log('Recent audit rows:', audit.rows);

  await client.end();
})();

Por qué:

  • La demo ilustra el flujo completo: acciones en la tabla de aplicación y registro en la tabla de auditoría.
  • En producción, usa roles dedicados y un pool de conexiones.

Consideraciones de diseño y rendimiento

  • Triggers síncronos: la inserción en la tabla de auditoría forma parte de la misma transacción. Es seguro (consistencia), pero añade latencia. Para cargas elevadas considera un enfoque asíncrono (logical decoding / streaming, o escribir en una cola lightweight desde un trigger NOTIFY y consumirlo fuera de la transacción).
  • JSONB vs columnas: JSONB da flexibilidad para esquemas cambiantes. Si necesitas consultas frecuentes por campos específicos, crea índices GIN/BTREE sobre (row_data ->> 'campo') o almacena columnas críticas desnormalizadas en la tabla_audit.
  • Bloat y VACUUM: las particiones ayudan a mantener tablas más pequeñas. Aun así, monitoriza autovacuum y planifica vacuum full/rewrites si hay churn intenso.
  • Seguridad y privacidad: no registres datos sensibles sin cifrado o enmascarado. Controla acceso al esquema audit.* y a dumps de particiones archivadas.

Consultas útiles

-- Obtener cambios de una fila en orden cronológico
SELECT changed_at, operation, changed_by, row_data
FROM audit.table_audit
WHERE table_name = 'orders' AND row_id = '123'
ORDER BY changed_at DESC;

-- Buscar cuando una columna específica cambió (si guardamos changed_columns)
SELECT * FROM audit.table_audit
WHERE table_name = 'orders' AND 'status' = ANY(changed_columns)
ORDER BY changed_at DESC LIMIT 50;

Por qué: estas consultas te permiten reconstruir la historia de una entidad y atender requisitos regulatorios o de debugging.

Próximo paso avanzado: si necesitas una solución de auditoría a escala con baja latencia, considera replicación lógica (pg_logical o plugins como wal2json) para capturar cambios fuera de la transacción y procesarlos con consumidores asíncronos. Y recuerda: nunca almacenes datos sensibles sin una política clara de cifrado y acceso.

Comentarios
¿Quieres comentar?

Inicia sesión con Telegram para participar en la conversación


Comentarios (0)

Aún no hay comentarios. ¡Sé el primero en comentar!

Iniciar Sesión