APIs reactivas en Java con Spring WebFlux: ejemplo práctico con R2dbc
En este post vas a ver cómo montar una API REST no bloqueante con Spring WebFlux (Spring Boot 3) usando R2DBC para PostgreSQL. El objetivo es mostrar la estructura mínima, patrones para no bloquear el hilo reactivo y cómo manejar timeouts y backpressure de forma práctica.
Requisitos
- Java 17+
- Spring Boot 3.x
- PostgreSQL + driver R2DBC
- Gradle o Maven
Dependencias clave (build.gradle.kts)
plugins {
id("org.springframework.boot") version "3.1.0"
kotlin("jvm") version "1.8.0" // opcional si usas Kotlin
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
runtimeOnly("io.r2dbc:r2dbc-postgresql")
implementation("io.projectreactor:reactor-core")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("io.projectreactor:reactor-test")
}
Configuración mínima (application.yml)
spring:
r2dbc:
url: r2dbc:postgresql://localhost:5432/mydb
username: myuser
password: mypass
sql:
init:
mode: never
# Ajustes reactivos útiles
server:
reactive:
max-http-header-size: 8192
Entidad y repositorio reactivo
Usa interfaces de Spring Data R2DBC para exponer Mono/Flux.
package com.example.demo.model;
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Table;
@Table("items")
public class Item {
@Id
private Long id;
private String name;
// getters y setters...
}
package com.example.demo.repository;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Flux;
public interface ItemRepository extends ReactiveCrudRepository- {
Flux
- findByNameContainingIgnoreCase(String name);
}
Service: lógica reactiva sin bloquear
No uses operaciones bloqueantes (jdbc, Thread.sleep, llamadas sin adaptar). Si necesitas ejecutar tarea bloqueante, usa Schedulers.boundedElastic() o un Executor dedicado.
package com.example.demo.service;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Service
public class ItemService {
private final ItemRepository repo;
public ItemService(ItemRepository repo) {
this.repo = repo;
}
public Flux- listAll() {
return repo.findAll()
.onBackpressureBuffer(1000, dropped -> {
// logging: elemento descartado por exceso de backpressure
});
}
public Mono
- findById(Long id) {
return repo.findById(id);
}
public Mono
- create(Item item) {
item.setId(null);
return repo.save(item);
}
}
Controller WebFlux
Ejemplo usando anotaciones. También podrías usar functional endpoints si prefieres.
package com.example.demo.controller;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
@RestController
@RequestMapping("/api/items")
public class ItemController {
private final ItemService service;
public ItemController(ItemService service) {
this.service = service;
}
@GetMapping(produces = MediaType.APPLICATION_NDJSON_VALUE)
public Flux- streamAll() {
// NDJSON permite streaming de objetos JSON
return service.listAll()
.delayElements(Duration.ofMillis(10)); // ejemplo de ritmo
}
@GetMapping(path = "/{id}")
public Mono
- getOne(@PathVariable Long id) {
// aplica timeout a operaciones que no deberían bloquear indefinidamente
return service.findById(id)
.timeout(Duration.ofSeconds(2));
}
@PostMapping
public Mono
- create(@RequestBody Item item) {
return service.create(item);
}
}
Manejo de errores y timeouts
- Usa
timeout()en operaciones que dependen de I/O externo para volver rápido al caller. - Evita convertir a bloqueante con
block()dentro del flujo. Si necesitas interoperar, hazlo enSchedulers.boundedElastic(). - Para control centralizado de errores en WebFlux, implementa
@ControllerAdviceque devuelvaMono<ServerResponse>o maneje exceptions típicas.
// ejemplo simple de excepción a nivel controlador
@ControllerAdvice
public class GlobalErrorHandler {
@ExceptionHandler(TimeoutException.class)
public Mono> handleTimeout(TimeoutException ex) {
return Mono.just(ResponseEntity.status(504).body("Request timeout"));
}
}
Pruebas con WebTestClient
@SpringBootTest
@AutoConfigureWebTestClient
public class ItemControllerTest {
@Autowired
WebTestClient client;
@Test
void streamWorks() {
client.get().uri("/api/items")
.accept(MediaType.APPLICATION_NDJSON)
.exchange()
.expectStatus().isOk()
.returnResult(Item.class)
.getResponseBody()
.take(5)
.collectList()
.block(); // sólo en tests
}
}
Buenas prácticas rápidas
- No uses JDBC ni llamadas bloqueantes en hilos de Netty.
- Mide y ajusta el pool R2DBC y la configuración de conexiones simultáneas del DB.
- Aplica backpressure: limita buffers, usa operadores como
onBackpressureBufferoonBackpressureDrop. - Controla timeouts y circuit breakers (Resilience4j tiene integración para Reactor).
Con esto tienes una base práctica para empezar una API reactiva con Spring WebFlux y R2DBC. Si tu carga incluye operaciones fuertemente bloqueantes (p. ej. bibliotecas legacy), encapsula esas llamadas en Mono.fromCallable(...).subscribeOn(Schedulers.boundedElastic()) y monitorea latencia y uso de hilos.
Consejo avanzado: instrumenta tu pipeline Reactor con hooks y operators como doOnEach, elapsed() y metrics() para identificar cuellos de botella reactivos; evita operaciones que fan-out masivo sin límites y considera circuit breakers por downstreams críticos.
¿Quieres comentar?
Inicia sesión con Telegram para participar en la conversación