Proyecto: Auditoría de cambios en PostgreSQL con triggers y API en Flask

sql Proyecto: Auditoría de cambios en PostgreSQL con triggers y API en Flask

Proyecto práctico: Auditoría de cambios en PostgreSQL con triggers y API en Flask

En este tutorial vas a construir un sistema de auditoría que registra INSERT, UPDATE y DELETE en tablas críticas de PostgreSQL usando funciones y triggers. Además publicaremos una API sencilla en Flask para consultar los registros de auditoría. Ideal para trazabilidad, cumplimiento y para facilitar debugging en producción.

Requisitos previos

  • PostgreSQL 12+ (local o en contenedor)
  • Python 3.8+
  • pip y virtualenv
  • Conocimientos básicos de SQL y Python

Estructura de carpetas

audit-project/
├─ sql/
│  ├─ schema.sql
│  └─ audit_triggers.sql
├─ app.py
├─ requirements.txt
└─ .env.example

Qué vas a crear

  • Tabla ejemplo users.
  • Tabla de auditoría genérica audit.log_entries que guarda metadata, operación, datos anteriores y nuevos (JSON).
  • Función PL/pgSQL que inserta en audit.log_entries desde triggers.
  • Triggers para INSERT/UPDATE/DELETE en la tabla users.
  • API Flask con endpoints para consultar logs por tabla, por id de fila y por rango de fechas.

Por qué usar este enfoque

Usar triggers a nivel de base de datos garantiza que ninguna aplicación omita la auditoría (consistencia). Guardar old y new como JSON permite reconstruir cambios y mantiene flexibilidad para cambios de esquema. Separar la tabla de auditoría en un schema dedicado (ej. audit) mantiene el modelo ordenado.

SQL: esquema y triggers

Archivo: sql/schema.sql

-- sql/schema.sql
CREATE SCHEMA IF NOT EXISTS audit;

-- Tabla de ejemplo
CREATE TABLE IF NOT EXISTS public.users (
    id SERIAL PRIMARY KEY,
    username TEXT NOT NULL UNIQUE,
    email TEXT NOT NULL UNIQUE,
    active BOOLEAN NOT NULL DEFAULT true,
    created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

-- Tabla de auditoría
CREATE TABLE IF NOT EXISTS audit.log_entries (
    id BIGSERIAL PRIMARY KEY,
    schema_name TEXT NOT NULL,
    table_name TEXT NOT NULL,
    row_id TEXT,
    operation CHAR(1) NOT NULL, -- I/U/D
    changed_by TEXT,
    changed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    old_data JSONB,
    new_data JSONB,
    statement_text TEXT -- opcional, para query info si se desea
);

-- Índices para consultas frecuentes
CREATE INDEX IF NOT EXISTS idx_audit_table ON audit.log_entries (schema_name, table_name);
CREATE INDEX IF NOT EXISTS idx_audit_rowid ON audit.log_entries (table_name, row_id);
CREATE INDEX IF NOT EXISTS idx_audit_changed_at ON audit.log_entries (changed_at DESC);

Archivo: sql/audit_triggers.sql

-- sql/audit_triggers.sql
-- Función genérica de auditoría
CREATE OR REPLACE FUNCTION audit.log_change() RETURNS trigger LANGUAGE plpgsql AS $$
DECLARE
    v_old JSONB;
    v_new JSONB;
    v_rowid TEXT;
BEGIN
    -- Serializar old/new a JSONB si existen
    IF TG_OP = 'DELETE' THEN
        v_old := to_jsonb(OLD);
        v_new := NULL;
        v_rowid := COALESCE(OLD.id::text, NULL);
    ELSIF TG_OP = 'INSERT' THEN
        v_old := NULL;
        v_new := to_jsonb(NEW);
        v_rowid := COALESCE(NEW.id::text, NULL);
    ELSE -- UPDATE
        v_old := to_jsonb(OLD);
        v_new := to_jsonb(NEW);
        v_rowid := COALESCE(NEW.id::text, OLD.id::text);
    END IF;

    -- Insertar entrada de audit
    INSERT INTO audit.log_entries(
        schema_name, table_name, row_id, operation, changed_by, changed_at, old_data, new_data, statement_text
    ) VALUES (
        TG_TABLE_SCHEMA,
        TG_TABLE_NAME,
        v_rowid,
        substr(TG_OP,1,1),
        current_setting('audit.current_user', true), -- valor opcional inyectado por app
        now(),
        v_old,
        v_new,
        NULL
    );

    IF TG_OP = 'DELETE' THEN
        RETURN OLD;
    ELSE
        RETURN NEW;
    END IF;
END;
$$;

-- Crear trigger para la tabla users
DROP TRIGGER IF EXISTS users_audit_trg ON public.users;
CREATE TRIGGER users_audit_trg
AFTER INSERT OR UPDATE OR DELETE ON public.users
FOR EACH ROW EXECUTE FUNCTION audit.log_change();

Archivo .env.example

# .env.example
DATABASE_URL=postgresql://user:password@localhost:5432/mydb
FLASK_ENV=development

API en Flask

Archivo: requirements.txt

Flask==2.2.5
psycopg2-binary==2.9.6
python-dotenv==1.0.0

Archivo principal: app.py

# app.py
import os
import json
from datetime import datetime
from flask import Flask, request, jsonify, abort
import psycopg2
from psycopg2.extras import RealDictCursor
from dotenv import load_dotenv

load_dotenv()

DATABASE_URL = os.getenv('DATABASE_URL')
if not DATABASE_URL:
    raise RuntimeError('DATABASE_URL must be set')

app = Flask(__name__)

def get_conn():
    return psycopg2.connect(DATABASE_URL)

@app.route('/audits', methods=['GET'])
def list_audits():
    """
    Parámetros opcionales:
      - table: nombre de tabla (ej. users)
      - row_id: id de fila
      - from: fecha ISO inicio
      - to: fecha ISO fin
      - limit: int
    """
    table = request.args.get('table')
    row_id = request.args.get('row_id')
    date_from = request.args.get('from')
    date_to = request.args.get('to')
    limit = int(request.args.get('limit') or 100)

    sql = ["SELECT id, schema_name, table_name, row_id, operation, changed_by, changed_at, old_data, new_data FROM audit.log_entries WHERE 1=1"]
    params = []
    if table:
        sql.append("AND table_name = %s")
        params.append(table)
    if row_id:
        sql.append("AND row_id = %s")
        params.append(row_id)
    if date_from:
        sql.append("AND changed_at >= %s")
        params.append(date_from)
    if date_to:
        sql.append("AND changed_at <= %s")
        params.append(date_to)
    sql.append("ORDER BY changed_at DESC LIMIT %s")
    params.append(limit)

    query = ' '.join(sql)

    with get_conn() as conn:
        with conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute(query, params)
            rows = cur.fetchall()

    # Convertir JSONB a objetos JSON serializables
    for r in rows:
        if r['old_data'] is not None and not isinstance(r['old_data'], (dict, list)):
            r['old_data'] = json.loads(r['old_data'])
        if r['new_data'] is not None and not isinstance(r['new_data'], (dict, list)):
            r['new_data'] = json.loads(r['new_data'])

    return jsonify(rows)

@app.route('/audits/', methods=['GET'])
def get_audit(audit_id):
    with get_conn() as conn:
        with conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute('SELECT * FROM audit.log_entries WHERE id = %s', (audit_id,))
            row = cur.fetchone()
            if not row:
                abort(404)
            # convertir JSONB
            if row['old_data'] is not None and not isinstance(row['old_data'], (dict, list)):
                row['old_data'] = json.loads(row['old_data'])
            if row['new_data'] is not None and not isinstance(row['new_data'], (dict, list)):
                row['new_data'] = json.loads(row['new_data'])
            return jsonify(row)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=os.getenv('FLASK_ENV') == 'development')

Cómo desplegarlo localmente

  1. Crear y activar virtualenv, instalar dependencias:
    python -m venv venv && source venv/bin/activate && pip install -r requirements.txt
  2. Configura .env con tu DATABASE_URL.
  3. Ejecuta SQL para crear esquema y triggers:
    psql $DATABASE_URL -f sql/schema.sql
    psql $DATABASE_URL -f sql/audit_triggers.sql
  4. Arranca la app: python app.py
  5. Prueba: insertar un usuario y consultar API:
    - Insertar: INSERT INTO public.users (username,email) VALUES ('alice','alice@example.com'); - Consultar: GET http://localhost:5000/audits?table=users

Explicaciones clave (el porqué)

  • Tabla audit.log_entries almacena old_data y new_data como JSONB porque: soporta cambios de esquema sin migrar la tabla de auditoría y facilita comparaciones y reconstrucción.
  • La función audit.log_change() se ejecuta AFTER para no interferir con la lógica de la operación; así la transacción original no cambia retornando NEW/OLD según corresponda.
  • Usamos current_setting('audit.current_user', true) para permitir que la aplicación inyecte el usuario que causa el cambio (ver nota más abajo). Si la variable no existe, devuelve NULL. Esto evita depender solo de la conexión DB para el actor.
  • Índices en columnas (table_name, row_id, changed_at) aceleran consultas más frecuentes al API.

Inyectar el usuario que hace el cambio

Si tu aplicación usa una conexión por request (pool), puedes ejecutar antes de la operación:

SET LOCAL audit.current_user = 'app_user:123';
-- luego realizar UPDATE/INSERT/DELETE en la misma transacción

Usar SET LOCAL dentro de la transacción evita persistir información entre conexiones.

Rendimiento y consideraciones

  • Triggers generan overhead. Para tablas con alta tasa de escritura considera:
  • Escribir a una cola (ej. Kafka) en lugar de sincronizar en la misma transacción si la latencia es crítica.
  • Limitar columnas auditoradas: serializa solo columnas relevantes o un diff en vez de todo el row si los objetos son grandes.
  • Archivar logs antiguos: particiona audit.log_entries por fecha o mueve a almacenamiento frío periódicamente.

Seguridad y cumplimiento

  • Protege acceso a la tabla audit.log_entries con roles y políticas (GRANT SELECT a equipos de auditoría, revoca a otros).
  • Si almacenas PII en old_data/new_data, considera enmascarar o cifrar campos sensibles antes de guardar.
  • Auditoría puede crear datos duplicados de PII: revisa políticas de retención (GDPR).

Posibles mejoras

  • Agregar endpoint que calcule un diff entre old/new y muestre solo campos cambiados.
  • Endpoint para revertir una fila a un estado previo (con precauciones).
  • Registro del statement SQL exacto: usar contrib modules o pg_stat_activity para capturar query text.
  • Soporte multi-schema y auditoría configurables por tabla usando metadatos.

Para continuar, prueba implementar un job de particionado por mes para audit.log_entries y una función que exporte a S3 archivos comprimidos de logs antiguos. Y recuerda: si vas a guardar datos sensibles en logs, diseña una política de encriptación y retención antes de pasarlo a producción.

Comentarios
¿Quieres comentar?

Inicia sesión con Telegram para participar en la conversación


Comentarios (0)

Aún no hay comentarios. ¡Sé el primero en comentar!

Iniciar Sesión