APIs reactivas en Java con Spring WebFlux: ejemplo práctico con R2dbc

java APIs reactivas en Java con Spring WebFlux: ejemplo práctico con R2dbc

APIs reactivas en Java con Spring WebFlux: ejemplo práctico con R2dbc

En este post vas a ver cómo montar una API REST no bloqueante con Spring WebFlux (Spring Boot 3) usando R2DBC para PostgreSQL. El objetivo es mostrar la estructura mínima, patrones para no bloquear el hilo reactivo y cómo manejar timeouts y backpressure de forma práctica.

Requisitos

  • Java 17+
  • Spring Boot 3.x
  • PostgreSQL + driver R2DBC
  • Gradle o Maven

Dependencias clave (build.gradle.kts)

plugins {
    id("org.springframework.boot") version "3.1.0"
    kotlin("jvm") version "1.8.0" // opcional si usas Kotlin
}

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-webflux")
    implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
    runtimeOnly("io.r2dbc:r2dbc-postgresql")
    implementation("io.projectreactor:reactor-core")
    testImplementation("org.springframework.boot:spring-boot-starter-test")
    testImplementation("io.projectreactor:reactor-test")
}

Configuración mínima (application.yml)

spring:
  r2dbc:
    url: r2dbc:postgresql://localhost:5432/mydb
    username: myuser
    password: mypass
  sql:
    init:
      mode: never

# Ajustes reactivos útiles
server:
  reactive:
    max-http-header-size: 8192

Entidad y repositorio reactivo

Usa interfaces de Spring Data R2DBC para exponer Mono/Flux.

package com.example.demo.model;

import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Table;

@Table("items")
public class Item {
    @Id
    private Long id;
    private String name;

    // getters y setters...
}

package com.example.demo.repository;

import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Flux;

public interface ItemRepository extends ReactiveCrudRepository {
    Flux findByNameContainingIgnoreCase(String name);
}

Service: lógica reactiva sin bloquear

No uses operaciones bloqueantes (jdbc, Thread.sleep, llamadas sin adaptar). Si necesitas ejecutar tarea bloqueante, usa Schedulers.boundedElastic() o un Executor dedicado.

package com.example.demo.service;

import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
public class ItemService {
    private final ItemRepository repo;

    public ItemService(ItemRepository repo) {
        this.repo = repo;
    }

    public Flux listAll() {
        return repo.findAll()
                   .onBackpressureBuffer(1000, dropped -> {
                       // logging: elemento descartado por exceso de backpressure
                   });
    }

    public Mono findById(Long id) {
        return repo.findById(id);
    }

    public Mono create(Item item) {
        item.setId(null);
        return repo.save(item);
    }
}

Controller WebFlux

Ejemplo usando anotaciones. También podrías usar functional endpoints si prefieres.

package com.example.demo.controller;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;

@RestController
@RequestMapping("/api/items")
public class ItemController {
    private final ItemService service;

    public ItemController(ItemService service) {
        this.service = service;
    }

    @GetMapping(produces = MediaType.APPLICATION_NDJSON_VALUE)
    public Flux streamAll() {
        // NDJSON permite streaming de objetos JSON
        return service.listAll()
                      .delayElements(Duration.ofMillis(10)); // ejemplo de ritmo
    }

    @GetMapping(path = "/{id}")
    public Mono getOne(@PathVariable Long id) {
        // aplica timeout a operaciones que no deberían bloquear indefinidamente
        return service.findById(id)
                      .timeout(Duration.ofSeconds(2));
    }

    @PostMapping
    public Mono create(@RequestBody Item item) {
        return service.create(item);
    }
}

Manejo de errores y timeouts

  • Usa timeout() en operaciones que dependen de I/O externo para volver rápido al caller.
  • Evita convertir a bloqueante con block() dentro del flujo. Si necesitas interoperar, hazlo en Schedulers.boundedElastic().
  • Para control centralizado de errores en WebFlux, implementa @ControllerAdvice que devuelva Mono<ServerResponse> o maneje exceptions típicas.
// ejemplo simple de excepción a nivel controlador
@ControllerAdvice
public class GlobalErrorHandler {
    @ExceptionHandler(TimeoutException.class)
    public Mono> handleTimeout(TimeoutException ex) {
        return Mono.just(ResponseEntity.status(504).body("Request timeout"));
    }
}

Pruebas con WebTestClient

@SpringBootTest
@AutoConfigureWebTestClient
public class ItemControllerTest {
    @Autowired
    WebTestClient client;

    @Test
    void streamWorks() {
        client.get().uri("/api/items")
              .accept(MediaType.APPLICATION_NDJSON)
              .exchange()
              .expectStatus().isOk()
              .returnResult(Item.class)
              .getResponseBody()
              .take(5)
              .collectList()
              .block(); // sólo en tests
    }
}

Buenas prácticas rápidas

  • No uses JDBC ni llamadas bloqueantes en hilos de Netty.
  • Mide y ajusta el pool R2DBC y la configuración de conexiones simultáneas del DB.
  • Aplica backpressure: limita buffers, usa operadores como onBackpressureBuffer o onBackpressureDrop.
  • Controla timeouts y circuit breakers (Resilience4j tiene integración para Reactor).

Con esto tienes una base práctica para empezar una API reactiva con Spring WebFlux y R2DBC. Si tu carga incluye operaciones fuertemente bloqueantes (p. ej. bibliotecas legacy), encapsula esas llamadas en Mono.fromCallable(...).subscribeOn(Schedulers.boundedElastic()) y monitorea latencia y uso de hilos.

Consejo avanzado: instrumenta tu pipeline Reactor con hooks y operators como doOnEach, elapsed() y metrics() para identificar cuellos de botella reactivos; evita operaciones que fan-out masivo sin límites y considera circuit breakers por downstreams críticos.

Comentarios
¿Quieres comentar?

Inicia sesión con Telegram para participar en la conversación


Comentarios (0)

Aún no hay comentarios. ¡Sé el primero en comentar!

Iniciar Sesión