Mejorar concurrencia en Java: CompletableFuture con ThreadPoolExecutor (guía práctica)

java Mejorar concurrencia en Java: CompletableFuture con ThreadPoolExecutor (guía práctica)

Mejorar concurrencia en Java: CompletableFuture con ThreadPoolExecutor (guía práctica)

En aplicaciones modernas es habitual ejecutar muchas tareas concurrentes, a menudo bloqueantes (I/O, llamadas HTTP, operaciones en base de datos). Usar CompletableFuture con el ForkJoinPool.commonPool por defecto suele funcionar, pero falla cuando las tareas bloquean: el pool se puede agotar y la latencia se dispara. Aquí verás cómo crear y usar un ThreadPoolExecutor dedicado, componer CompletableFutures correctamente, manejar errores, timeouts y cómo inspeccionar el pool en producción.

Por qué evitar el commonPool para I/O bloqueante

ForkJoinPool está optimizado para tareas cortas y recursivas; cuando una tarea bloquea, consume un worker que no se reemplaza eficientemente. Resultado: menos throughput y latencias peores. Para I/O bloqueante usa un pool con hilos dedicados y cola acotada.

Pool dedicado y factory con nombre

Un ThreadPoolExecutor configurable te da control sobre el tamaño, la cola y la política de rechazo. Nombra los hilos para facilitar debugging y métricas.

public static class NamedThreadFactory implements ThreadFactory {
    private final AtomicInteger idx = new AtomicInteger();
    private final String pattern;
    public NamedThreadFactory(String pattern){ this.pattern = pattern; }
    @Override
    public Thread newThread(Runnable r){
        Thread t = new Thread(r, String.format(pattern, idx.getAndIncrement()));
        t.setDaemon(false);
        return t;
    }
}

// Configuración típica para tareas I/O-bound
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    50,                    // corePoolSize
    200,                   // maximumPoolSize
    60L, TimeUnit.SECONDS, // keepAliveTime
    new LinkedBlockingQueue<>(1000),
    new NamedThreadFactory("io-pool-%d"),
    new ThreadPoolExecutor.AbortPolicy() // falla rápido si la cola está llena
);

Ejemplo práctico: peticiones HTTP concurrentes (Java 11+ HttpClient)

Este ejemplo muestra cómo lanzar múltiples peticiones HTTP usando el pool dedicado y recoger resultados con CompletableFuture.

HttpClient client = HttpClient.newBuilder()
    .connectTimeout(Duration.ofSeconds(5))
    .build();

CompletableFuture fetch(String url) {
    HttpRequest req = HttpRequest.newBuilder(URI.create(url)).GET().build();

    return CompletableFuture.supplyAsync(() -> {
        try {
            return client.send(req, HttpResponse.BodyHandlers.ofString()).body();
        } catch (IOException | InterruptedException e) {
            throw new CompletionException(e);
        }
    }, executor)
    .orTimeout(10, TimeUnit.SECONDS) // cancela si tarda demasiado (Java 9+)
    .exceptionally(ex -> {
        System.err.println("Fetch failed for " + url + ": " + ex.toString());
        return null; // o fallback
    });
}

List urls = List.of("https://example.com", "https://api.example.org/1", "https://api.example.org/2");
List> futures = urls.stream()
    .map(this::fetch)
    .collect(Collectors.toList());

CompletableFuture all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture> allResults = all.thenApply(v ->
    futures.stream()
        .map(CompletableFuture::join) // seguro porque 'all' completó
        .filter(Objects::nonNull)
        .collect(Collectors.toList())
);

allResults.thenAccept(results -> {
    // procesar resultados
    System.out.println("Recibidos " + results.size() + " respuestas");
});

Manejo de errores, timeouts y cancelación

Usa orTimeout/completeOnTimeout para imponer límites. Usa exceptionally/handle para capturar excepciones y proporcionar fallback. Si necesitas cancelar, llama a cancel(true) sobre el CompletableFuture.

// Timeout y fallback
CompletableFuture f = fetch(url)
    .completeOnTimeout("FALLBACK", 10, TimeUnit.SECONDS);

// Cancelar si alguna condición externa ocurre
if (shouldCancel) {
    f.cancel(true);
}

Composición: thenCompose y combinar resultados

Para tareas dependientes encadena con thenCompose; para combinar resultados paralelos usa thenCombine o allOf.

// Secuencial: obtener token luego llamar API con ese token
CompletableFuture tokenF = CompletableFuture.supplyAsync(() -> getToken(), executor);
CompletableFuture userF = tokenF.thenCompose(token ->
    CompletableFuture.supplyAsync(() -> callApiWithToken(token), executor)
);

// Paralelo: combinar dos llamadas independientes
CompletableFuture a = ...; // compute A
CompletableFuture b = ...; // compute B
CompletableFuture sum = a.thenCombine(b, Integer::sum);

Inspección y métricas del pool

En producción monitoriza: activeCount, queue size, completedTaskCount y rejectedCount (implementa un handler personalizado para contar rechazos).

if (executor instanceof ThreadPoolExecutor) {
    ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
    System.out.println("Active: " + tpe.getActiveCount());
    System.out.println("Queue size: " + tpe.getQueue().size());
    System.out.println("Completed: " + tpe.getCompletedTaskCount());
}

// Shutdown correcto
executor.shutdown();
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
    executor.shutdownNow();
}

Buenas prácticas

  • Usa un pool separado para I/O bloqueante; no mezcles con CPU-bound.
  • Configura una cola acotada y una política de rechazo que haga sentido (AbortPolicy para fail-fast, CallerRunsPolicy para degradación controlada).
  • Registra métricas y expón límites: maxPool, queueCapacity, latencias p50/p95/p99.
  • No llames join() en el hilo del pool para evitar deadlocks; usa composición asíncrona.
  • Asegúrate de shutdown en hooks o durante deploys para evitar fugas de hilos.

Si quieres evolucionar: considera dividir pools por tipo de tarea (red, base de datos, tareas batch) y añade circuit breakers y backpressure (ej. Resilience4j) para evitar que una dependencia lenta tumbe tu sistema.

Consejo avanzado: en entornos con soporte para Project Loom, prueba Executors.newVirtualThreadPerTaskExecutor() para simplificar el modelo de concurrencia y reducir la necesidad de dimensionar pools para I/O intenso; mientras tanto, protege siempre tu pool dedicado con cola acotada, handler de rechazo y métricas para detectar saturación temprana.

Comentarios
¿Quieres comentar?

Inicia sesión con Telegram para participar en la conversación


Comentarios (0)

Aún no hay comentarios. ¡Sé el primero en comentar!

Iniciar Sesión