Proyecto práctico: Auditoría de cambios en PostgreSQL (triggers, particionamiento y consultas eficientes)

sql Proyecto práctico: Auditoría de cambios en PostgreSQL (triggers, particionamiento y consultas eficientes)

Proyecto práctico: Auditoría de cambios en PostgreSQL (triggers, particionamiento y consultas eficientes)

Este tutorial te guía para implementar un sistema de auditoría (audit log) en PostgreSQL usando triggers en PL/pgSQL, particionamiento por fecha, y consultas para recuperar y reconstruir estados históricos. Está pensado para entornos donde necesitas trazabilidad de quien cambió qué y cuándo, con buena performance y políticas de retención.

Requisitos previos

  • PostgreSQL 12+ (idealmente 13+). Algunos fragmentos usan funciones como txid_current().
  • Acceso psql o cliente SQL con permisos para crear esquemas, funciones y triggers.
  • Conocimientos básicos de JSONB en PostgreSQL.
  • Opcional: pg_cron o un sistema de cron para automatizar limpieza de particiones.

Estructura de carpetas del proyecto

project-audit-postgres/
├─ sql/
│  ├─ 01_init.sql            -- esquema base y tabla de ejemplo
│  ├─ 02_audit_triggers.sql  -- tablas de auditoría, función trigger y triggers
│  ├─ 03_queries.sql         -- funciones para consultar y reconstruir filas
│  └─ 04_retention.sql       -- particionamiento y job de retención
└─ README.md

1) Crear esquema y tabla de ejemplo (sql/01_init.sql)

-- sql/01_init.sql
CREATE SCHEMA IF NOT EXISTS app;
CREATE SCHEMA IF NOT EXISTS audit;

-- Tabla de ejemplo: customers
CREATE TABLE IF NOT EXISTS app.customers (
  id SERIAL PRIMARY KEY,
  name TEXT NOT NULL,
  email TEXT UNIQUE,
  meta JSONB,
  created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
  updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

-- Trigger para mantener updated_at automatizado
CREATE OR REPLACE FUNCTION app.set_updated_at()
RETURNS TRIGGER LANGUAGE plpgsql AS $$
BEGIN
  NEW.updated_at := now();
  RETURN NEW;
END;
$$;

DROP TRIGGER IF EXISTS trg_set_updated_at ON app.customers;
CREATE TRIGGER trg_set_updated_at
BEFORE UPDATE ON app.customers
FOR EACH ROW EXECUTE FUNCTION app.set_updated_at();

-- Datos de ejemplo
INSERT INTO app.customers (name, email, meta) VALUES
('Alice', 'alice@example.com', '{"tier": "gold"}'),
('Bob',   'bob@example.com',   '{"tier": "silver"}');

Por qué: separar esquemas (app y audit) ayuda a controlar permisos y a orientar el mantenimiento. La tabla de ejemplo facilita probar el sistema.

2) Tablas de auditoría, particionamiento y triggers (sql/02_audit_triggers.sql)

-- sql/02_audit_triggers.sql
-- Tabla de auditoría particionada por fecha
CREATE TABLE IF NOT EXISTS audit.row_audit (
  id BIGSERIAL PRIMARY KEY,
  table_name TEXT NOT NULL,
  record_id JSONB NOT NULL,        -- permite claves simples o compuestas
  operation CHAR(1) NOT NULL,      -- 'I' | 'U' | 'D'
  changed_by TEXT,                 -- aplicación/usuario
  changed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
  changed_data JSONB NOT NULL,     -- estructura con 'old' y 'new' o solo cambiadas
  changed_columns TEXT[],
  transaction_id BIGINT
) PARTITION BY RANGE (changed_at);

-- función para crear particiones diarias (si no existen)
CREATE OR REPLACE FUNCTION audit.create_partition_for_day(p_day DATE)
RETURNS TEXT LANGUAGE plpgsql AS $$
DECLARE
  partition_name TEXT := format('audit.row_audit_%s', to_char(p_day, 'YYYYMMDD'));
  create_sql TEXT;
BEGIN
  IF NOT EXISTS (
    SELECT 1 FROM pg_class WHERE relname = partition_name
  ) THEN
    create_sql := format(
      'CREATE TABLE IF NOT EXISTS %I PARTITION OF audit.row_audit FOR VALUES FROM (''%s'') TO (''%s'')',
      partition_name, p_day::text, (p_day + 1)::text
    );
    EXECUTE create_sql;
    -- index useful on record_id -> value extraction depending on PK structure
    EXECUTE format('CREATE INDEX ON %I ((record_id ->> ''id''))', partition_name);
  END IF;
  RETURN partition_name;
END;
$$;

-- trigger function genérica para tablas que queramos auditar
CREATE OR REPLACE FUNCTION audit.row_audit_trigger()
RETURNS TRIGGER LANGUAGE plpgsql AS $$
DECLARE
  rec_old JSONB;
  rec_new JSONB;
  cols TEXT[] := ARRAY[]::TEXT[];
  key TEXT;
  val_old TEXT;
  val_new TEXT;
  p_name TEXT;
  rec_id JSONB;
BEGIN
  -- convertir filas a jsonb para comparar
  IF TG_OP = 'DELETE' THEN
    rec_old := to_jsonb(OLD);
    rec_new := '{}'::jsonb;
    -- asume PK 'id' si existe; para tablas con PK compuesta ajustar
    IF (OLD.*) IS NOT NULL AND OLD.id IS NOT NULL THEN
      rec_id := jsonb_build_object('id', OLD.id);
    ELSE
      rec_id := jsonb_build_object();
    END IF;
    cols := (SELECT array_agg(k) FROM jsonb_each_text(rec_old) AS t(k,v));
    INSERT INTO audit.row_audit(table_name, record_id, operation, changed_by, changed_at, changed_data, changed_columns, transaction_id)
    VALUES (TG_TABLE_NAME, rec_id, 'D', current_setting('audit.user', true), now(), jsonb_build_object('old', rec_old), cols, txid_current());
    RETURN OLD;
  ELSIF TG_OP = 'INSERT' THEN
    rec_old := '{}'::jsonb;
    rec_new := to_jsonb(NEW);
    IF NEW.id IS NOT NULL THEN
      rec_id := jsonb_build_object('id', NEW.id);
    ELSE
      rec_id := jsonb_build_object();
    END IF;
    cols := (SELECT array_agg(k) FROM jsonb_each_text(rec_new) AS t(k,v));
    INSERT INTO audit.row_audit(table_name, record_id, operation, changed_by, changed_at, changed_data, changed_columns, transaction_id)
    VALUES (TG_TABLE_NAME, rec_id, 'I', current_setting('audit.user', true), now(), jsonb_build_object('new', rec_new), cols, txid_current());
    RETURN NEW;
  ELSE
    -- UPDATE: calcular sólo columnas cambiadas
    rec_old := to_jsonb(OLD);
    rec_new := to_jsonb(NEW);
    FOR key, val_new IN SELECT * FROM jsonb_each_text(rec_new) LOOP
      val_old := (rec_old ->> key);
      IF val_old IS DISTINCT FROM val_new THEN
        cols := array_append(cols, key);
      END IF;
    END LOOP;
    IF array_length(cols,1) IS NULL THEN
      -- nada cambiado (por ejemplo trigger que actualizó updated_at manualmente), no insertar
      RETURN NEW;
    END IF;
    IF NEW.id IS NOT NULL THEN
      rec_id := jsonb_build_object('id', NEW.id);
    ELSE
      rec_id := jsonb_build_object();
    END IF;
    -- guardar solo las columnas cambiadas en old/new para reducir tamaño
    INSERT INTO audit.row_audit(table_name, record_id, operation, changed_by, changed_at, changed_data, changed_columns, transaction_id)
    VALUES (
      TG_TABLE_NAME,
      rec_id,
      'U',
      current_setting('audit.user', true),
      now(),
      jsonb_build_object('old', rec_old \#> ARRAY[]::TEXT[], 'new', rec_new) -- placeholder: we'll store full old/new for simplicity
      , cols,
      txid_current()
    );
    RETURN NEW;
  END IF;
END;
$$;

-- Nota: en la inserción anterior usamos un placeholder para 'old'/'new'. Para uso en producción adapta para extraer sólo claves en cols.

-- Crear trigger en la tabla de ejemplo
DROP TRIGGER IF EXISTS trg_audit_customers ON app.customers;
CREATE TRIGGER trg_audit_customers
AFTER INSERT OR UPDATE OR DELETE ON app.customers
FOR EACH ROW EXECUTE FUNCTION audit.row_audit_trigger();

-- Crear la partición para hoy
SELECT audit.create_partition_for_day(current_date);

Por qué:

  • Usar JSONB para record_id y changed_data nos da flexibilidad frente a PK simples o compuestas y evita múltiples columnas rígidas.
  • Particionar por fecha (changed_at) mejora scans y facilita borrar rangos antiguos sin bloquear la tabla principal.
  • Guardar changed_columns permite consultas rápidas sobre qué campos cambiaron.
  • El trigger filtra UPDATEs sin cambios para evitar ruido y sobrecarga.

3) Consultas y reconstrucción de estado (sql/03_queries.sql)

-- sql/03_queries.sql
-- Obtener historial para un registro
CREATE OR REPLACE FUNCTION audit.get_history(p_table TEXT, p_record_id JSONB)
RETURNS SETOF audit.row_audit LANGUAGE sql AS $$
  SELECT * FROM audit.row_audit
  WHERE table_name = p_table AND record_id = p_record_id
  ORDER BY changed_at;
$$;

-- Reconstruir estado de una fila hasta un instante dado
CREATE OR REPLACE FUNCTION audit.reconstruct_row(p_table TEXT, p_record_id JSONB, p_at TIMESTAMPTZ)
RETURNS JSONB LANGUAGE plpgsql AS $$
DECLARE
  rec JSONB := '{}';
  r RECORD;
BEGIN
  FOR r IN
    SELECT * FROM audit.row_audit
    WHERE table_name = p_table AND record_id = p_record_id AND changed_at <= p_at
    ORDER BY changed_at
  LOOP
    IF r.operation = 'I' THEN
      -- si el insert guarda 'new' en changed_data
      IF (r.changed_data ? 'new') THEN
        rec := r.changed_data -> 'new';
      ELSE
        rec := r.changed_data;
      END IF;
    ELSIF r.operation = 'U' THEN
      -- preferimos si 'new' existe
      IF (r.changed_data ? 'new') THEN
        rec := rec || r.changed_data -> 'new';
      ELSE
        -- fallback: merge full new
        rec := rec || r.changed_data;
      END IF;
    ELSIF r.operation = 'D' THEN
      rec := NULL; -- eliminado
    END IF;
  END LOOP;
  RETURN rec;
END;
$$;

-- Ejemplo de uso:
-- SELECT audit.reconstruct_row('customers', jsonb_build_object('id', 1), now() - interval '1 hour');

Por qué: la función de reconstrucción aplica eventos en orden cronológico. Esto funciona porque en los inserts/updates guardamos (o deberíamos guardar) la forma completa o parcial de 'new'. Si quieres más robustez, almacena snapshots periódicos para acelerar reconstrucciones largas.

4) Retención y mantenimiento (sql/04_retention.sql)

-- sql/04_retention.sql
-- eliminar particiones antiguas mayores a N días (por ejemplo 90)
CREATE OR REPLACE FUNCTION audit.drop_partitions_older_than(p_days INTEGER)
RETURNS VOID LANGUAGE plpgsql AS $$
DECLARE
  threshold DATE := current_date - p_days;
  r RECORD;
  partname TEXT;
BEGIN
  FOR r IN
    SELECT relname FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace
    WHERE n.nspname = 'audit' AND relname LIKE 'row_audit_%'
  LOOP
    partname := r.relname;
    -- extraer fecha desde el nombre: row_audit_YYYYMMDD
    IF substring(partname FROM '\\d{8}$') IS NOT NULL THEN
      IF to_date(substring(partname FROM '\\d{8}$'), 'YYYYMMDD') < threshold THEN
        EXECUTE format('DROP TABLE IF EXISTS audit.%I CASCADE', partname);
      END IF;
    END IF;
  END LOOP;
END;
$$;

-- Ejemplo para agendar: ejecutar audit.drop_partitions_older_than(90) semanalmente via cron/pg_cron.

Por qué: borrar particiones es mucho más eficiente que borrar filas. Mantén un periodo de retención acorde a tus requisitos legales y de negocio.

Notas operacionales y buenas prácticas

  • Evita escribir datos sensibles en texto plano en la auditoría, o enrótalos/mascáralos: la auditoría suele tener menos restricciones de acceso.
  • Controla el tamaño de las particiones: crea particiones diarias si tienes volumen medio/alto; semanales si el volumen es bajo.
  • Para tablas con PK compuestas adapta la construcción de record_id en el trigger (por ejemplo incluir cada columna PK).
  • Considera añadir índices en las particiones según el patrón de consulta (p. ej. index sobre (table_name, (record_id->>'id'))).
  • Monitorea el impacto en latencia de escritura: triggers AFTER son menos intrusivos que BEFORE; sin embargo, escribir en la partición aún añade I/O.

Opciones avanzadas: si tienes un sistema de alta tasa de escritura, evalúa usar logical decoding (pgoutput/pg_recvlogical) o herramientas externas (Debezium, wal2json) para capturar cambios sin triggers dentro de la base de datos principal.

Seguridad: restringe SELECT/DELETE/INSERT en el esquema audit a roles de auditoría/ops; evita que aplicaciones puedan truncar o borrar logs sin autorización.

Próximo paso recomendado: adapta el trigger para almacenar únicamente los pares old/new de las columnas en changed_columns (en lugar de todo el objeto), e implementa snapshots periódicos (por ejemplo, nightly) para acelerar reconstrucciones en tablas con historial muy largo.

Consejo avanzado: para volúmenes muy altos, instrumenta un worker que lea el WAL (logical replication) y escriba en una base de datos de auditoría separada, evitando impacto en las transacciones OLTP críticas.

Comentarios
¿Quieres comentar?

Inicia sesión con Telegram para participar en la conversación


Comentarios (0)

Aún no hay comentarios. ¡Sé el primero en comentar!

Iniciar Sesión