Patrón práctico: pipeline asíncrono en Rust con Tokio, backpressure y cancelación
En este post veremos cómo construir un pipeline asíncrono y robusto en Rust usando tokio. Cubriremos:
- Cómo aplicar backpressure con canales acotados (
mpsc::channel). - Cómo limitar concurrencia con
Semaphorepara procesamiento paralelo controlado. - Cancelación y apagado ordenado con señales (
broadcast/oneshot). - Buenas prácticas: spawn_blocking, tracing y tests con
tokio::test.
Escenario
Tienes un productor que genera trabajos, un pool de workers que los procesan (I/O o CPU bounded), y quieres:
- Evitar que el productor inunde memoria (backpressure).
- Poder cancelar todo rápidamente (shutdown).
- Limitar la concurrencia de procesamiento para no agotar recursos.
Esqueleto del pipeline
Usaremos un canal mpsc acotado para backpressure, un Semaphore para limitar el paralelismo y un canal broadcast para anunciar shutdown a todos.
use tokio::sync::{broadcast, mpsc, Semaphore};
use std::sync::Arc;
use tokio::time::{sleep, Duration};
#[derive(Debug)]
struct Job { id: u64, payload: String }
async fn producer(mut tx: mpsc::Sender, mut shutdown_rx: broadcast::Receiver<()>) {
let mut id = 0u64;
loop {
// Puedes usar select! para priorizar shutdown
tokio::select! {
biased; // comprueba shutdown primero
_ = shutdown_rx.recv() => {
tracing::info!("producer: received shutdown");
break;
}
_ = sleep(Duration::from_millis(50)) => {
id += 1;
let job = Job { id, payload: format!("payload-{}", id) };
// Si el canal está lleno, await aquí aplica backpressure al productor
if let Err(_) = tx.send(job).await {
tracing::warn!("producer: receiver dropped, exiting");
break;
}
}
}
}
tracing::info!("producer: exiting");
}
async fn worker(
mut rx: mpsc::Receiver,
shutdown_tx: broadcast::Sender<()>,
concurrency_limit: Arc
) {
while let Some(job) = rx.recv().await {
// Antes de procesar, adquiere un permiso (limita concurrencia)
let permit = concurrency_limit.acquire().await.unwrap();
let shutdown_tx = shutdown_tx.clone();
tokio::spawn(async move {
// Si la tarea hace I/O bloqueante, usar spawn_blocking
process_job(job).await;
drop(permit); // libera permit
// opcional: emitir métricas o eventos
if false {
// ejemplo de uso: notificar algo
let _ = shutdown_tx.send(());
}
});
}
tracing::info!("worker: receiver closed, exiting");
}
async fn process_job(job: Job) {
// Simula trabajo I/O-bound
tracing::info!("processing {}", job.id);
sleep(Duration::from_millis(200)).await;
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
// Canal acotado: backpressure si producer produce más rápido de lo que los workers procesan
let (tx, rx) = mpsc::channel(16);
let (shutdown_tx, _) = broadcast::channel(1);
let concurrency_limit = Arc::new(Semaphore::new(4));
// Lanza worker(s) que consumirán del mismo receptor: clona el receiver si quieres particionar
let worker_rx = rx; // en el ejemplo usamos un solo receptor y spawn para procesar en paralelo
let producer_shutdown_rx = shutdown_tx.subscribe();
let prod = tokio::spawn(producer(tx.clone(), producer_shutdown_rx));
let work = tokio::spawn(worker(worker_rx, shutdown_tx.clone(), concurrency_limit.clone()));
// Ejecuta un tiempo y luego solicita shutdown
tokio::time::sleep(Duration::from_secs(5)).await;
// Envía la señal (puede fallar si no hay listeners)
let _ = shutdown_tx.send(());
// Espera a que terminen
let _ = prod.await;
let _ = work.await;
tracing::info!("main: finished");
}
Explicación rápida
- mpsc::channel(16): un canal de capacidad 16 ofrece backpressure: cuando está lleno,
send().awaitespera hasta que haya espacio. - Semaphore limita el número simultáneo de tareas reales en ejecución (aquí 4).
- broadcast::channel se usa para anunciar shutdown a múltiples listeners;
subscribe()crea receptores independientes. - spawn para cada unidad de trabajo permite que el receptor sea rápido en aceptar nuevos mensajes y delegar procesamiento.
Cancelación y shutdown ordenado
La clave es priorizar la recepción de la señal de shutdown en los loops (usa tokio::select! con biased si quieres comprobar la señal antes). Para asegurar que las tareas en vuelo terminen:
- Mantén handles
JoinHandlesi necesitas.awaity verificar errores. - Si quieres terminar inmediatamente y darles X ms a los workers para limpiar, combina
broadcast+tokio::time::timeout.
Manejo de operaciones bloqueantes
Si process_job hace trabajo CPU intensivo o bloqueo de sistema (p. ej. acceso a librerías C), usa tokio::task::spawn_blocking para evitar bloquear el runtime:
let result = tokio::task::spawn_blocking(move || {
// trabajo CPU / bloqueante aquí
}).await?;
Pruebas
Puedes probar el pipeline con tokio::test. Para flujos temporales usa tokio::time::pause() y avanzar el tiempo con advance() si necesitas control determinista:
#[tokio::test]
async fn test_producer_backpressure() {
tokio::time::pause();
// configurar pipeline con canal pequeño
let (tx, mut rx) = mpsc::channel(2);
// enviar 3 mensajes: el tercero debe bloquear hasta que se reciba uno
let send1 = tx.send(Job { id:1, payload: String::new() });
let send2 = tx.send(Job { id:2, payload: String::new() });
let send3 = tx.send(Job { id:3, payload: String::new() });
tokio::pin!(send1);
tokio::pin!(send2);
tokio::pin!(send3);
// advance para permitir que las send progresen si es necesario
tokio::task::yield_now().await;
assert!(!send3.as_mut().is_ready()); // el tercero no debe estar listo porque canal está lleno
// recibir uno y desbloquear
let _ = rx.recv().await;
// ahora el tercero puede completarse
let _ = send3.await;
}
Errores y trampas comunes
- No ignores el caso en que
send()devuelve Err: significa que todos los receptores se han cerrado. - No uses un buffer desmesurado para "evitar backpressure"; a largo plazo solo cambia memoria por latencia y pérdida de control.
- No mezcles lógica de cancelación en muchas partes sin un mecanismo centralizado: usa una señal (broadcast/watch) o contextos claros.
Si necesitas priorización, reemplaza el canal por una estructura con múltiples colas y un pequeño scheduler que extraiga según prioridad.
Consejo avanzado: instrumenta con tracing y añade contadores (prometheus) para queue_length, latencias de procesamiento y permisos en uso (semaphore permits). Para escenarios de seguridad, valida y sanea el payload antes de encolarlo y considera límites de tasa por productor para evitar DoS interno.
¿Quieres comentar?
Inicia sesión con Telegram para participar en la conversación