Mejorar concurrencia en Java: CompletableFuture con ThreadPoolExecutor (guía práctica)
En aplicaciones modernas es habitual ejecutar muchas tareas concurrentes, a menudo bloqueantes (I/O, llamadas HTTP, operaciones en base de datos). Usar CompletableFuture con el ForkJoinPool.commonPool por defecto suele funcionar, pero falla cuando las tareas bloquean: el pool se puede agotar y la latencia se dispara. Aquí verás cómo crear y usar un ThreadPoolExecutor dedicado, componer CompletableFutures correctamente, manejar errores, timeouts y cómo inspeccionar el pool en producción.
Por qué evitar el commonPool para I/O bloqueante
ForkJoinPool está optimizado para tareas cortas y recursivas; cuando una tarea bloquea, consume un worker que no se reemplaza eficientemente. Resultado: menos throughput y latencias peores. Para I/O bloqueante usa un pool con hilos dedicados y cola acotada.
Pool dedicado y factory con nombre
Un ThreadPoolExecutor configurable te da control sobre el tamaño, la cola y la política de rechazo. Nombra los hilos para facilitar debugging y métricas.
public static class NamedThreadFactory implements ThreadFactory {
private final AtomicInteger idx = new AtomicInteger();
private final String pattern;
public NamedThreadFactory(String pattern){ this.pattern = pattern; }
@Override
public Thread newThread(Runnable r){
Thread t = new Thread(r, String.format(pattern, idx.getAndIncrement()));
t.setDaemon(false);
return t;
}
}
// Configuración típica para tareas I/O-bound
ThreadPoolExecutor executor = new ThreadPoolExecutor(
50, // corePoolSize
200, // maximumPoolSize
60L, TimeUnit.SECONDS, // keepAliveTime
new LinkedBlockingQueue<>(1000),
new NamedThreadFactory("io-pool-%d"),
new ThreadPoolExecutor.AbortPolicy() // falla rápido si la cola está llena
);
Ejemplo práctico: peticiones HTTP concurrentes (Java 11+ HttpClient)
Este ejemplo muestra cómo lanzar múltiples peticiones HTTP usando el pool dedicado y recoger resultados con CompletableFuture.
HttpClient client = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(5))
.build();
CompletableFuture fetch(String url) {
HttpRequest req = HttpRequest.newBuilder(URI.create(url)).GET().build();
return CompletableFuture.supplyAsync(() -> {
try {
return client.send(req, HttpResponse.BodyHandlers.ofString()).body();
} catch (IOException | InterruptedException e) {
throw new CompletionException(e);
}
}, executor)
.orTimeout(10, TimeUnit.SECONDS) // cancela si tarda demasiado (Java 9+)
.exceptionally(ex -> {
System.err.println("Fetch failed for " + url + ": " + ex.toString());
return null; // o fallback
});
}
List urls = List.of("https://example.com", "https://api.example.org/1", "https://api.example.org/2");
List> futures = urls.stream()
.map(this::fetch)
.collect(Collectors.toList());
CompletableFuture all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture> allResults = all.thenApply(v ->
futures.stream()
.map(CompletableFuture::join) // seguro porque 'all' completó
.filter(Objects::nonNull)
.collect(Collectors.toList())
);
allResults.thenAccept(results -> {
// procesar resultados
System.out.println("Recibidos " + results.size() + " respuestas");
});
Manejo de errores, timeouts y cancelación
Usa orTimeout/completeOnTimeout para imponer límites. Usa exceptionally/handle para capturar excepciones y proporcionar fallback. Si necesitas cancelar, llama a cancel(true) sobre el CompletableFuture.
// Timeout y fallback
CompletableFuture f = fetch(url)
.completeOnTimeout("FALLBACK", 10, TimeUnit.SECONDS);
// Cancelar si alguna condición externa ocurre
if (shouldCancel) {
f.cancel(true);
}
Composición: thenCompose y combinar resultados
Para tareas dependientes encadena con thenCompose; para combinar resultados paralelos usa thenCombine o allOf.
// Secuencial: obtener token luego llamar API con ese token
CompletableFuture tokenF = CompletableFuture.supplyAsync(() -> getToken(), executor);
CompletableFuture userF = tokenF.thenCompose(token ->
CompletableFuture.supplyAsync(() -> callApiWithToken(token), executor)
);
// Paralelo: combinar dos llamadas independientes
CompletableFuture a = ...; // compute A
CompletableFuture b = ...; // compute B
CompletableFuture sum = a.thenCombine(b, Integer::sum);
Inspección y métricas del pool
En producción monitoriza: activeCount, queue size, completedTaskCount y rejectedCount (implementa un handler personalizado para contar rechazos).
if (executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
System.out.println("Active: " + tpe.getActiveCount());
System.out.println("Queue size: " + tpe.getQueue().size());
System.out.println("Completed: " + tpe.getCompletedTaskCount());
}
// Shutdown correcto
executor.shutdown();
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
Buenas prácticas
- Usa un pool separado para I/O bloqueante; no mezcles con CPU-bound.
- Configura una cola acotada y una política de rechazo que haga sentido (AbortPolicy para fail-fast, CallerRunsPolicy para degradación controlada).
- Registra métricas y expón límites: maxPool, queueCapacity, latencias p50/p95/p99.
- No llames join() en el hilo del pool para evitar deadlocks; usa composición asíncrona.
- Asegúrate de shutdown en hooks o durante deploys para evitar fugas de hilos.
Si quieres evolucionar: considera dividir pools por tipo de tarea (red, base de datos, tareas batch) y añade circuit breakers y backpressure (ej. Resilience4j) para evitar que una dependencia lenta tumbe tu sistema.
Consejo avanzado: en entornos con soporte para Project Loom, prueba Executors.newVirtualThreadPerTaskExecutor() para simplificar el modelo de concurrencia y reducir la necesidad de dimensionar pools para I/O intenso; mientras tanto, protege siempre tu pool dedicado con cola acotada, handler de rechazo y métricas para detectar saturación temprana.
¿Quieres comentar?
Inicia sesión con Telegram para participar en la conversación