Patrón práctico: pipeline asíncrono en Rust con Tokio, backpressure y cancelación

rust Patrón práctico: pipeline asíncrono en Rust con Tokio, backpressure y cancelación

Patrón práctico: pipeline asíncrono en Rust con Tokio, backpressure y cancelación

En este post veremos cómo construir un pipeline asíncrono y robusto en Rust usando tokio. Cubriremos:

  • Cómo aplicar backpressure con canales acotados (mpsc::channel).
  • Cómo limitar concurrencia con Semaphore para procesamiento paralelo controlado.
  • Cancelación y apagado ordenado con señales (broadcast / oneshot).
  • Buenas prácticas: spawn_blocking, tracing y tests con tokio::test.

Escenario

Tienes un productor que genera trabajos, un pool de workers que los procesan (I/O o CPU bounded), y quieres:

  • Evitar que el productor inunde memoria (backpressure).
  • Poder cancelar todo rápidamente (shutdown).
  • Limitar la concurrencia de procesamiento para no agotar recursos.

Esqueleto del pipeline

Usaremos un canal mpsc acotado para backpressure, un Semaphore para limitar el paralelismo y un canal broadcast para anunciar shutdown a todos.

use tokio::sync::{broadcast, mpsc, Semaphore};
use std::sync::Arc;
use tokio::time::{sleep, Duration};

#[derive(Debug)]
struct Job { id: u64, payload: String }

async fn producer(mut tx: mpsc::Sender, mut shutdown_rx: broadcast::Receiver<()>) {
    let mut id = 0u64;
    loop {
        // Puedes usar select! para priorizar shutdown
        tokio::select! {
            biased; // comprueba shutdown primero
            _ = shutdown_rx.recv() => {
                tracing::info!("producer: received shutdown");
                break;
            }
            _ = sleep(Duration::from_millis(50)) => {
                id += 1;
                let job = Job { id, payload: format!("payload-{}", id) };
                // Si el canal está lleno, await aquí aplica backpressure al productor
                if let Err(_) = tx.send(job).await {
                    tracing::warn!("producer: receiver dropped, exiting");
                    break;
                }
            }
        }
    }
    tracing::info!("producer: exiting");
}

async fn worker(
    mut rx: mpsc::Receiver,
    shutdown_tx: broadcast::Sender<()>,
    concurrency_limit: Arc
) {
    while let Some(job) = rx.recv().await {
        // Antes de procesar, adquiere un permiso (limita concurrencia)
        let permit = concurrency_limit.acquire().await.unwrap();
        let shutdown_tx = shutdown_tx.clone();
        tokio::spawn(async move {
            // Si la tarea hace I/O bloqueante, usar spawn_blocking
            process_job(job).await;
            drop(permit); // libera permit
            // opcional: emitir métricas o eventos
            if false {
                // ejemplo de uso: notificar algo
                let _ = shutdown_tx.send(());
            }
        });
    }
    tracing::info!("worker: receiver closed, exiting");
}

async fn process_job(job: Job) {
    // Simula trabajo I/O-bound
    tracing::info!("processing {}", job.id);
    sleep(Duration::from_millis(200)).await;
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt::init();

    // Canal acotado: backpressure si producer produce más rápido de lo que los workers procesan
    let (tx, rx) = mpsc::channel(16);
    let (shutdown_tx, _) = broadcast::channel(1);

    let concurrency_limit = Arc::new(Semaphore::new(4));

    // Lanza worker(s) que consumirán del mismo receptor: clona el receiver si quieres particionar
    let worker_rx = rx; // en el ejemplo usamos un solo receptor y spawn para procesar en paralelo

    let producer_shutdown_rx = shutdown_tx.subscribe();
    let prod = tokio::spawn(producer(tx.clone(), producer_shutdown_rx));
    let work = tokio::spawn(worker(worker_rx, shutdown_tx.clone(), concurrency_limit.clone()));

    // Ejecuta un tiempo y luego solicita shutdown
    tokio::time::sleep(Duration::from_secs(5)).await;
    // Envía la señal (puede fallar si no hay listeners)
    let _ = shutdown_tx.send(());

    // Espera a que terminen
    let _ = prod.await;
    let _ = work.await;
    tracing::info!("main: finished");
}

Explicación rápida

  • mpsc::channel(16): un canal de capacidad 16 ofrece backpressure: cuando está lleno, send().await espera hasta que haya espacio.
  • Semaphore limita el número simultáneo de tareas reales en ejecución (aquí 4).
  • broadcast::channel se usa para anunciar shutdown a múltiples listeners; subscribe() crea receptores independientes.
  • spawn para cada unidad de trabajo permite que el receptor sea rápido en aceptar nuevos mensajes y delegar procesamiento.

Cancelación y shutdown ordenado

La clave es priorizar la recepción de la señal de shutdown en los loops (usa tokio::select! con biased si quieres comprobar la señal antes). Para asegurar que las tareas en vuelo terminen:

  • Mantén handles JoinHandle si necesitas .await y verificar errores.
  • Si quieres terminar inmediatamente y darles X ms a los workers para limpiar, combina broadcast + tokio::time::timeout.

Manejo de operaciones bloqueantes

Si process_job hace trabajo CPU intensivo o bloqueo de sistema (p. ej. acceso a librerías C), usa tokio::task::spawn_blocking para evitar bloquear el runtime:

let result = tokio::task::spawn_blocking(move || {
    // trabajo CPU / bloqueante aquí
}).await?;

Pruebas

Puedes probar el pipeline con tokio::test. Para flujos temporales usa tokio::time::pause() y avanzar el tiempo con advance() si necesitas control determinista:

#[tokio::test]
async fn test_producer_backpressure() {
    tokio::time::pause();
    // configurar pipeline con canal pequeño
    let (tx, mut rx) = mpsc::channel(2);
    // enviar 3 mensajes: el tercero debe bloquear hasta que se reciba uno
    let send1 = tx.send(Job { id:1, payload: String::new() });
    let send2 = tx.send(Job { id:2, payload: String::new() });
    let send3 = tx.send(Job { id:3, payload: String::new() });

    tokio::pin!(send1);
    tokio::pin!(send2);
    tokio::pin!(send3);

    // advance para permitir que las send progresen si es necesario
    tokio::task::yield_now().await;
    assert!(!send3.as_mut().is_ready()); // el tercero no debe estar listo porque canal está lleno

    // recibir uno y desbloquear
    let _ = rx.recv().await;
    // ahora el tercero puede completarse
    let _ = send3.await;
}

Errores y trampas comunes

  • No ignores el caso en que send() devuelve Err: significa que todos los receptores se han cerrado.
  • No uses un buffer desmesurado para "evitar backpressure"; a largo plazo solo cambia memoria por latencia y pérdida de control.
  • No mezcles lógica de cancelación en muchas partes sin un mecanismo centralizado: usa una señal (broadcast/watch) o contextos claros.

Si necesitas priorización, reemplaza el canal por una estructura con múltiples colas y un pequeño scheduler que extraiga según prioridad.

Consejo avanzado: instrumenta con tracing y añade contadores (prometheus) para queue_length, latencias de procesamiento y permisos en uso (semaphore permits). Para escenarios de seguridad, valida y sanea el payload antes de encolarlo y considera límites de tasa por productor para evitar DoS interno.

Comentarios
¿Quieres comentar?

Inicia sesión con Telegram para participar en la conversación


Comentarios (0)

Aún no hay comentarios. ¡Sé el primero en comentar!

Iniciar Sesión