Building Non-Blocking, Asynchronous Database Applications
Article
R2DBC (Reactive Relational Database Connectivity) is a reactive programming API for SQL databases that provides fully non-blocking database access. Unlike traditional JDBC which blocks threads during database operations, R2DBC enables true reactive programming with backpressure support, making it ideal for high-throughput, low-latency applications.
R2DBC Architecture Overview
Key Components:
- ConnectionFactory: Creates database connections reactively
- Connection: Reactive database connection
- Statement: Reactive SQL statement execution
- Result: Reactive result set processing
- Transaction: Reactive transaction management
R2DBC vs JDBC:
- Non-blocking I/O: No thread blocking during database operations
- Backpressure Support: Controlled data flow between producer and consumer
- Reactive Streams: Built on Reactive Streams specification (Publisher/Subscriber)
- Functional API: Fluent, functional programming style
1. Project Setup and Dependencies
Maven Configuration (pom.xml):
<properties>
<r2dbc.version>1.0.0.RELEASE</r2dbc.version>
<postgresql.version>1.0.0.RELEASE</postgresql.version>
<reactor.version>3.4.0</reactor.version>
<spring-boot.version>2.7.0</spring-boot.version>
</properties>
<dependencies>
<!-- R2DBC Core -->
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-spi</artifactId>
<version>${r2dbc.version}</version>
</dependency>
<!-- R2DBC Pool -->
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-pool</artifactId>
<version>${r2dbc.version}</version>
</dependency>
<!-- PostgreSQL Driver -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<version>${postgresql.version}</version>
</dependency>
<!-- MySQL Driver (alternative) -->
<dependency>
<groupId>dev.miku</groupId>
<artifactId>r2dbc-mysql</artifactId>
<version>0.8.2.RELEASE</version>
</dependency>
<!-- H2 Database (for testing) -->
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-h2</artifactId>
<version>${r2dbc.version}</version>
<scope>test</scope>
</dependency>
<!-- Reactor Core -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>${reactor.version}</version>
</dependency>
<!-- Spring Boot R2DBC -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<!-- Testing -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>${reactor.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
2. Core R2DBC Configuration
Connection Factory Setup:
package com.example.r2dbc.config;
import io.r2dbc.pool.ConnectionPool;
import io.r2dbc.pool.ConnectionPoolConfiguration;
import io.r2dbc.postgresql.PostgresqlConnectionConfiguration;
import io.r2dbc.postgresql.PostgresqlConnectionFactory;
import io.r2dbc.spi.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;
import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;
import java.time.Duration;
@Configuration
@EnableR2dbcRepositories
public class R2dbcConfig extends AbstractR2dbcConfiguration {
@Override
@Bean
public ConnectionFactory connectionFactory() {
// PostgreSQL configuration
PostgresqlConnectionConfiguration connectionConfig = PostgresqlConnectionConfiguration.builder()
.host("localhost")
.port(5432)
.database("reactive_db")
.username("reactive_user")
.password("reactive_pass")
.codecRegistrar(PostgresqlConnectionConfiguration.builder().build().getCodecRegistrar())
.build();
// Connection pool configuration
ConnectionPoolConfiguration poolConfig = ConnectionPoolConfiguration.builder()
.connectionFactory(new PostgresqlConnectionFactory(connectionConfig))
.name("reactive-pool")
.initialSize(5)
.maxSize(20)
.maxIdleTime(Duration.ofMinutes(30))
.maxAcquireTime(Duration.ofSeconds(30))
.maxCreateConnectionTime(Duration.ofSeconds(10))
.validationQuery("SELECT 1")
.build();
return new ConnectionPool(poolConfig);
}
}
// Alternative: Programmatic configuration without Spring
class ManualR2dbcConfig {
public ConnectionFactory createConnectionFactory() {
return new PostgresqlConnectionFactory(
PostgresqlConnectionConfiguration.builder()
.host("localhost")
.port(5432)
.database("reactive_db")
.username("reactive_user")
.password("reactive_pass")
.build()
);
}
public ConnectionPool createConnectionPool(ConnectionFactory connectionFactory) {
ConnectionPoolConfiguration poolConfig = ConnectionPoolConfiguration.builder()
.connectionFactory(connectionFactory)
.maxSize(20)
.maxIdleTime(Duration.ofMinutes(30))
.build();
return new ConnectionPool(poolConfig);
}
}
Database Initialization:
package com.example.r2dbc.config;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.r2dbc.connection.init.ConnectionFactoryInitializer;
import org.springframework.r2dbc.connection.init.ResourceDatabasePopulator;
import reactor.core.publisher.Mono;
@Configuration
public class DatabaseInitializer {
private static final Logger log = LoggerFactory.getLogger(DatabaseInitializer.class);
@Bean
public ConnectionFactoryInitializer initializer(ConnectionFactory connectionFactory) {
ConnectionFactoryInitializer initializer = new ConnectionFactoryInitializer();
initializer.setConnectionFactory(connectionFactory);
ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
populator.addScript(new ClassPathResource("schema.sql"));
populator.addScript(new ClassPathResource("data.sql"));
initializer.setDatabasePopulator(populator);
return initializer;
}
@Bean
public CommandLineRunner initDatabase(ConnectionFactory connectionFactory) {
return args -> {
Mono.from(connectionFactory.create())
.flatMapMany(connection ->
connection.createStatement(
"""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
name VARCHAR(255) NOT NULL,
age INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS products (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT,
price DECIMAL(10,2) NOT NULL,
stock_quantity INTEGER DEFAULT 0,
active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS orders (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
total_amount DECIMAL(10,2) NOT NULL,
status VARCHAR(50) DEFAULT 'PENDING',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
).execute()
)
.then()
.subscribe(
nil -> log.info("Database schema created successfully"),
error -> log.error("Error creating database schema: {}", error.getMessage())
);
};
}
}
3. Entity Classes
package com.example.r2dbc.entity;
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Column;
import org.springframework.data.relational.core.mapping.Table;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.Objects;
@Table("users")
public class User {
@Id
private Long id;
private String email;
private String name;
private Integer age;
@Column("created_at")
private LocalDateTime createdAt;
@Column("updated_at")
private LocalDateTime updatedAt;
// Constructors
public User() {}
public User(String email, String name, Integer age) {
this.email = email;
this.name = name;
this.age = age;
this.createdAt = LocalDateTime.now();
this.updatedAt = LocalDateTime.now();
}
// Getters and Setters
public Long getId() { return id; }
public void setId(Long id) { this.id = id; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public Integer getAge() { return age; }
public void setAge(Integer age) { this.age = age; }
public LocalDateTime getCreatedAt() { return createdAt; }
public void setCreatedAt(LocalDateTime createdAt) { this.createdAt = createdAt; }
public LocalDateTime getUpdatedAt() { return updatedAt; }
public void setUpdatedAt(LocalDateTime updatedAt) { this.updatedAt = updatedAt; }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
User user = (User) o;
return Objects.equals(id, user.id) &&
Objects.equals(email, user.email);
}
@Override
public int hashCode() {
return Objects.hash(id, email);
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", email='" + email + '\'' +
", name='" + name + '\'' +
", age=" + age +
'}';
}
}
@Table("products")
public class Product {
@Id
private Long id;
private String name;
private String description;
private BigDecimal price;
@Column("stock_quantity")
private Integer stockQuantity;
private Boolean active;
@Column("created_at")
private LocalDateTime createdAt;
// Constructors
public Product() {}
public Product(String name, String description, BigDecimal price, Integer stockQuantity) {
this.name = name;
this.description = description;
this.price = price;
this.stockQuantity = stockQuantity;
this.active = true;
this.createdAt = LocalDateTime.now();
}
// Getters and Setters
public Long getId() { return id; }
public void setId(Long id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getDescription() { return description; }
public void setDescription(String description) { this.description = description; }
public BigDecimal getPrice() { return price; }
public void setPrice(BigDecimal price) { this.price = price; }
public Integer getStockQuantity() { return stockQuantity; }
public void setStockQuantity(Integer stockQuantity) { this.stockQuantity = stockQuantity; }
public Boolean getActive() { return active; }
public void setActive(Boolean active) { this.active = active; }
public LocalDateTime getCreatedAt() { return createdAt; }
public void setCreatedAt(LocalDateTime createdAt) { this.createdAt = createdAt; }
}
@Table("orders")
public class Order {
@Id
private Long id;
@Column("user_id")
private Long userId;
@Column("total_amount")
private BigDecimal totalAmount;
private String status;
@Column("created_at")
private LocalDateTime createdAt;
// Constructors
public Order() {}
public Order(Long userId, BigDecimal totalAmount, String status) {
this.userId = userId;
this.totalAmount = totalAmount;
this.status = status;
this.createdAt = LocalDateTime.now();
}
// Getters and Setters
public Long getId() { return id; }
public void setId(Long id) { this.id = id; }
public Long getUserId() { return userId; }
public void setUserId(Long userId) { this.userId = userId; }
public BigDecimal getTotalAmount() { return totalAmount; }
public void setTotalAmount(BigDecimal totalAmount) { this.totalAmount = totalAmount; }
public String getStatus() { return status; }
public void setStatus(String status) { this.status = status; }
public LocalDateTime getCreatedAt() { return createdAt; }
public void setCreatedAt(LocalDateTime createdAt) { this.createdAt = createdAt; }
}
4. Repository Layer with Spring Data R2DBC
package com.example.r2dbc.repository;
import com.example.r2dbc.entity.User;
import org.springframework.data.r2dbc.repository.Query;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Repository
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
// Basic query methods
Mono<User> findByEmail(String email);
Flux<User> findByNameContainingIgnoreCase(String name);
Flux<User> findByAgeGreaterThan(Integer age);
Flux<User> findByAgeBetween(Integer minAge, Integer maxAge);
// Custom query with @Query
@Query("SELECT * FROM users WHERE age >= :minAge AND age <= :maxAge ORDER BY created_at DESC")
Flux<User> findUsersByAgeRange(Integer minAge, Integer maxAge);
@Query("SELECT COUNT(*) FROM users WHERE age > :age")
Mono<Long> countUsersOlderThan(Integer age);
@Query("UPDATE users SET name = :name, updated_at = CURRENT_TIMESTAMP WHERE id = :id")
Mono<Long> updateUserName(Long id, String name);
// Complex query with joins (if needed)
@Query("""
SELECT u.* FROM users u
INNER JOIN orders o ON u.id = o.user_id
WHERE o.total_amount > :minAmount
""")
Flux<User> findUsersWithOrdersAboveAmount(BigDecimal minAmount);
}
@Repository
public interface ProductRepository extends ReactiveCrudRepository<Product, Long> {
Flux<Product> findByActiveTrue();
Flux<Product> findByPriceBetween(BigDecimal minPrice, BigDecimal maxPrice);
Flux<Product> findByStockQuantityGreaterThan(Integer quantity);
Mono<Product> findByName(String name);
@Query("UPDATE products SET stock_quantity = stock_quantity - :quantity WHERE id = :id AND stock_quantity >= :quantity")
Mono<Long> decreaseStock(Long id, Integer quantity);
@Query("SELECT * FROM products WHERE LOWER(name) LIKE LOWER(CONCAT('%', :name, '%'))")
Flux<Product> searchByName(String name);
}
@Repository
public interface OrderRepository extends ReactiveCrudRepository<Order, Long> {
Flux<Order> findByUserId(Long userId);
Flux<Order> findByStatus(String status);
Flux<Order> findByTotalAmountGreaterThan(BigDecimal amount);
@Query("SELECT * FROM orders WHERE user_id = :userId AND status = :status ORDER BY created_at DESC")
Flux<Order> findUserOrdersByStatus(Long userId, String status);
@Query("UPDATE orders SET status = :status WHERE id = :id")
Mono<Long> updateOrderStatus(Long id, String status);
}
5. Custom R2DBC Repository Implementation
For complex operations that can't be expressed with Spring Data:
package com.example.r2dbc.repository.custom;
import com.example.r2dbc.entity.User;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.RowMetadata;
import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.LocalDateTime;
import java.util.function.BiFunction;
@Repository
public class CustomUserRepository {
private final DatabaseClient databaseClient;
// Row mapper function
private final BiFunction<Row, RowMetadata, User> USER_MAPPER = (row, metadata) -> {
User user = new User();
user.setId(row.get("id", Long.class));
user.setEmail(row.get("email", String.class));
user.setName(row.get("name", String.class));
user.setAge(row.get("age", Integer.class));
user.setCreatedAt(row.get("created_at", LocalDateTime.class));
user.setUpdatedAt(row.get("updated_at", LocalDateTime.class));
return user;
};
public CustomUserRepository(ConnectionFactory connectionFactory) {
this.databaseClient = DatabaseClient.create(connectionFactory);
}
public Flux<User> findUsersWithPagination(int page, int size) {
int offset = page * size;
return databaseClient.sql("""
SELECT * FROM users
ORDER BY created_at DESC
LIMIT :limit OFFSET :offset
""")
.bind("limit", size)
.bind("offset", offset)
.map(USER_MAPPER)
.all();
}
public Mono<User> upsertUser(User user) {
return databaseClient.sql("""
INSERT INTO users (email, name, age, created_at, updated_at)
VALUES (:email, :name, :age, :createdAt, :updatedAt)
ON CONFLICT (email) DO UPDATE SET
name = EXCLUDED.name,
age = EXCLUDED.age,
updated_at = EXCLUDED.updated_at
RETURNING *
""")
.bind("email", user.getEmail())
.bind("name", user.getName())
.bind("age", user.getAge())
.bind("createdAt", user.getCreatedAt())
.bind("updatedAt", user.getUpdatedAt())
.map(USER_MAPPER)
.one();
}
public Flux<User> findUsersByComplexCriteria(String namePattern, Integer minAge,
Integer maxAge, LocalDateTime createdAfter) {
return databaseClient.sql("""
SELECT * FROM users
WHERE (LOWER(name) LIKE LOWER(:namePattern))
AND (:minAge IS NULL OR age >= :minAge)
AND (:maxAge IS NULL OR age <= :maxAge)
AND (:createdAfter IS NULL OR created_at >= :createdAfter)
ORDER BY created_at DESC
""")
.bind("namePattern", "%" + namePattern + "%")
.bind("minAge", minAge)
.bind("maxAge", maxAge)
.bind("createdAfter", createdAfter)
.map(USER_MAPPER)
.all();
}
public Mono<Long> bulkUpdateUserAge(Integer oldAge, Integer newAge) {
return databaseClient.sql("UPDATE users SET age = :newAge, updated_at = CURRENT_TIMESTAMP WHERE age = :oldAge")
.bind("newAge", newAge)
.bind("oldAge", oldAge)
.fetch()
.rowsUpdated();
}
public Mono<Boolean> userExistsByEmail(String email) {
return databaseClient.sql("SELECT 1 FROM users WHERE email = :email LIMIT 1")
.bind("email", email)
.map((row, metadata) -> row.get(0, Integer.class))
.one()
.map(count -> count != null)
.defaultIfEmpty(false);
}
}
6. Service Layer with Reactive Programming
package com.example.r2dbc.service;
import com.example.r2dbc.entity.User;
import com.example.r2dbc.repository.UserRepository;
import com.example.r2dbc.repository.custom.CustomUserRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.time.LocalDateTime;
@Service
public class UserService {
private static final Logger log = LoggerFactory.getLogger(UserService.class);
private final UserRepository userRepository;
private final CustomUserRepository customUserRepository;
public UserService(UserRepository userRepository, CustomUserRepository customUserRepository) {
this.userRepository = userRepository;
this.customUserRepository = customUserRepository;
}
public Mono<User> createUser(User user) {
return userRepository.save(user)
.doOnSuccess(u -> log.info("User created: {}", u.getEmail()))
.doOnError(error -> log.error("Error creating user: {}", error.getMessage()));
}
public Mono<User> getUserById(Long id) {
return userRepository.findById(id)
.switchIfEmpty(Mono.error(new RuntimeException("User not found with id: " + id)));
}
public Mono<User> getUserByEmail(String email) {
return userRepository.findByEmail(email)
.switchIfEmpty(Mono.error(new RuntimeException("User not found with email: " + email)));
}
public Flux<User> getAllUsers() {
return userRepository.findAll()
.delayElements(Duration.ofMillis(10)) // Simulate backpressure handling
.doOnNext(user -> log.debug("Processing user: {}", user.getEmail()));
}
public Flux<User> searchUsers(String name, Integer minAge, Integer maxAge) {
return customUserRepository.findUsersByComplexCriteria(name, minAge, maxAge, null)
.doOnSubscribe(subscription -> log.info("Starting user search with criteria: name={}, age={}-{}",
name, minAge, maxAge));
}
public Mono<User> updateUser(Long id, User userUpdate) {
return userRepository.findById(id)
.flatMap(existingUser -> {
existingUser.setName(userUpdate.getName());
existingUser.setAge(userUpdate.getAge());
existingUser.setUpdatedAt(LocalDateTime.now());
return userRepository.save(existingUser);
})
.switchIfEmpty(Mono.error(new RuntimeException("User not found with id: " + id)));
}
public Mono<Void> deleteUser(Long id) {
return userRepository.findById(id)
.flatMap(user -> userRepository.delete(user))
.switchIfEmpty(Mono.error(new RuntimeException("User not found with id: " + id)));
}
public Flux<User> getUsersPaginated(int page, int size) {
return customUserRepository.findUsersWithPagination(page, size);
}
public Mono<Long> countUsers() {
return userRepository.count();
}
@Transactional
public Mono<User> createUserWithValidation(User user) {
return customUserRepository.userExistsByEmail(user.getEmail())
.flatMap(exists -> {
if (exists) {
return Mono.error(new RuntimeException("User with email already exists: " + user.getEmail()));
}
return userRepository.save(user);
});
}
// Batch operations
public Flux<User> createUsersBatch(Flux<User> users) {
return users
.buffer(100) // Process in batches of 100
.flatMap(userRepository::saveAll)
.doOnNext(user -> log.info("Created user: {}", user.getEmail()));
}
// Complex business logic with multiple database operations
@Transactional
public Mono<User> updateUserWithAgeValidation(Long id, User userUpdate) {
return userRepository.findById(id)
.flatMap(existingUser -> {
// Business logic validation
if (userUpdate.getAge() != null && userUpdate.getAge() < 18) {
return Mono.error(new RuntimeException("User must be at least 18 years old"));
}
existingUser.setName(userUpdate.getName());
existingUser.setAge(userUpdate.getAge());
existingUser.setUpdatedAt(LocalDateTime.now());
return userRepository.save(existingUser);
})
.switchIfEmpty(Mono.error(new RuntimeException("User not found with id: " + id)));
}
}
7. Transaction Management
package com.example.r2dbc.service;
import com.example.r2dbc.entity.Order;
import com.example.r2dbc.entity.Product;
import com.example.r2dbc.entity.User;
import com.example.r2dbc.repository.OrderRepository;
import com.example.r2dbc.repository.ProductRepository;
import com.example.r2dbc.repository.UserRepository;
import io.r2dbc.spi.ConnectionFactory;
import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.stereotype.Service;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.reactive.TransactionalOperator;
import reactor.core.publisher.Mono;
import java.math.BigDecimal;
@Service
public class TransactionalService {
private final UserRepository userRepository;
private final ProductRepository productRepository;
private final OrderRepository orderRepository;
private final DatabaseClient databaseClient;
private final TransactionalOperator transactionalOperator;
public TransactionalService(UserRepository userRepository,
ProductRepository productRepository,
OrderRepository orderRepository,
DatabaseClient databaseClient,
ReactiveTransactionManager transactionManager) {
this.userRepository = userRepository;
this.productRepository = productRepository;
this.orderRepository = orderRepository;
this.databaseClient = databaseClient;
this.transactionalOperator = TransactionalOperator.create(transactionManager);
}
// Programmatic transaction management
public Mono<Void> createUserWithOrderTransactional(User user, Order order) {
return transactionalOperator.execute(status ->
userRepository.save(user)
.flatMap(savedUser -> {
order.setUserId(savedUser.getId());
return orderRepository.save(order);
})
.then()
).then();
}
// Manual transaction management
public Mono<Void> transferStock(Long fromProductId, Long toProductId, Integer quantity) {
return databaseClient.inConnectionMany(connection ->
Mono.from(connection.beginTransaction())
.thenMany(connection.createStatement(
"UPDATE products SET stock_quantity = stock_quantity - :quantity WHERE id = :id AND stock_quantity >= :quantity")
.bind("quantity", quantity)
.bind("id", fromProductId)
.execute())
.flatMap(result ->
Mono.from(connection.createStatement(
"UPDATE products SET stock_quantity = stock_quantity + :quantity WHERE id = :id")
.bind("quantity", quantity)
.bind("id", toProductId)
.execute()))
.then(Mono.from(connection.commitTransaction()))
.onErrorResume(error ->
Mono.from(connection.rollbackTransaction())
.then(Mono.error(error)))
).then();
}
// Complex transactional operation
public Mono<Order> placeOrder(Long userId, Long productId, Integer quantity) {
return transactionalOperator.execute(status ->
productRepository.findById(productId)
.switchIfEmpty(Mono.error(new RuntimeException("Product not found")))
.flatMap(product -> {
if (product.getStockQuantity() < quantity) {
return Mono.error(new RuntimeException("Insufficient stock"));
}
// Calculate total amount
BigDecimal totalAmount = product.getPrice().multiply(BigDecimal.valueOf(quantity));
// Create order
Order order = new Order(userId, totalAmount, "CONFIRMED");
// Update stock and create order in transaction
return productRepository.decreaseStock(productId, quantity)
.flatMap(updated -> {
if (updated == 0) {
return Mono.error(new RuntimeException("Failed to update stock"));
}
return orderRepository.save(order);
});
})
);
}
// Read-only transaction
public Mono<BigDecimal> getUserTotalOrderAmount(Long userId) {
return databaseClient.sql("""
SELECT COALESCE(SUM(total_amount), 0) as total
FROM orders
WHERE user_id = :userId AND status = 'COMPLETED'
""")
.bind("userId", userId)
.map((row, metadata) -> row.get("total", BigDecimal.class))
.one()
.as(transactionalOperator::transactional);
}
}
8. Reactive Web Controller
package com.example.r2dbc.controller;
import com.example.r2dbc.entity.User;
import com.example.r2dbc.service.UserService;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.validation.Valid;
import java.time.Duration;
@RestController
@RequestMapping("/api/users")
public class UserController {
private final UserService userService;
public UserController(UserService userService) {
this.userService = userService;
}
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Mono<ResponseEntity<User>> createUser(@Valid @RequestBody User user) {
return userService.createUserWithValidation(user)
.map(ResponseEntity::ok)
.onErrorResume(error ->
Mono.just(ResponseEntity.badRequest().build()));
}
@GetMapping("/{id}")
public Mono<ResponseEntity<User>> getUserById(@PathVariable Long id) {
return userService.getUserById(id)
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@GetMapping
public Flux<User> getAllUsers(
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "20") int size) {
return userService.getUsersPaginated(page, size);
}
@GetMapping("/search")
public Flux<User> searchUsers(
@RequestParam(required = false) String name,
@RequestParam(required = false) Integer minAge,
@RequestParam(required = false) Integer maxAge) {
return userService.searchUsers(name, minAge, maxAge);
}
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamUsers() {
return userService.getAllUsers()
.delayElements(Duration.ofSeconds(1)); // Simulate real-time stream
}
@PutMapping("/{id}")
public Mono<ResponseEntity<User>> updateUser(@PathVariable Long id, @Valid @RequestBody User user) {
return userService.updateUserWithAgeValidation(id, user)
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.notFound().build())
.onErrorResume(error ->
Mono.just(ResponseEntity.badRequest().build()));
}
@DeleteMapping("/{id}")
public Mono<ResponseEntity<Void>> deleteUser(@PathVariable Long id) {
return userService.deleteUser(id)
.then(Mono.just(ResponseEntity.noContent().<Void>build()))
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@GetMapping("/count")
public Mono<ResponseEntity<Long>> countUsers() {
return userService.countUsers()
.map(ResponseEntity::ok);
}
@PostMapping("/batch")
@ResponseStatus(HttpStatus.CREATED)
public Flux<User> createUsersBatch(@RequestBody Flux<User> users) {
return userService.createUsersBatch(users);
}
}
9. Error Handling and Resilience
package com.example.r2dbc.config;
import io.r2dbc.spi.R2dbcException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestControllerAdvice;
import org.springframework.web.bind.support.WebExchangeBindException;
import reactor.core.publisher.Mono;
import java.util.HashMap;
import java.util.Map;
@RestControllerAdvice
public class GlobalErrorHandler {
private static final Logger log = LoggerFactory.getLogger(GlobalErrorHandler.class);
@ExceptionHandler(R2dbcException.class)
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public Mono<Map<String, Object>> handleDatabaseException(R2dbcException ex) {
log.error("Database error occurred: {}", ex.getMessage(), ex);
Map<String, Object> errorResponse = new HashMap<>();
errorResponse.put("error", "Database operation failed");
errorResponse.put("message", ex.getMessage());
errorResponse.put("code", ex.getErrorCode());
return Mono.just(errorResponse);
}
@ExceptionHandler(WebExchangeBindException.class)
@ResponseStatus(HttpStatus.BAD_REQUEST)
public Mono<Map<String, Object>> handleValidationException(WebExchangeBindException ex) {
Map<String, Object> errorResponse = new HashMap<>();
errorResponse.put("error", "Validation failed");
errorResponse.put("message", "Invalid request parameters");
errorResponse.put("details", ex.getBindingResult().getAllErrors());
return Mono.just(errorResponse);
}
@ExceptionHandler(RuntimeException.class)
@ResponseStatus(HttpStatus.BAD_REQUEST)
public Mono<Map<String, Object>> handleBusinessException(RuntimeException ex) {
Map<String, Object> errorResponse = new HashMap<>();
errorResponse.put("error", "Business rule violation");
errorResponse.put("message", ex.getMessage());
return Mono.just(errorResponse);
}
}
// Circuit breaker pattern for R2DBC operations
package com.example.r2dbc.resilience;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import java.time.Duration;
@Component
public class DatabaseCircuitBreaker {
private final CircuitBreaker circuitBreaker;
public DatabaseCircuitBreaker() {
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.waitDurationInOpenState(Duration.ofSeconds(30))
.permittedNumberOfCallsInHalfOpenState(10)
.slidingWindowSize(20)
.build();
this.circuitBreaker = CircuitBreaker.of("database", config);
}
public <T> Mono<T> protect(Mono<T> databaseCall) {
return databaseCall
.transform(CircuitBreakerOperator.of(circuitBreaker))
.onErrorResume(error -> {
// Fallback logic
return Mono.error(error);
});
}
}
10. Testing R2DBC Applications
package com.example.r2dbc.test;
import com.example.r2dbc.entity.User;
import com.example.r2dbc.repository.UserRepository;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.time.Duration;
import java.util.List;
@Testcontainers
@SpringBootTest
class UserRepositoryTest {
@Container
static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:13")
.withDatabaseName("testdb")
.withUsername("test")
.withPassword("test");
@DynamicPropertySource
static void configureProperties(DynamicPropertyRegistry registry) {
registry.add("spring.r2dbc.url", () ->
"r2dbc:postgresql://" + postgres.getHost() + ":" + postgres.getFirstMappedPort() + "/testdb");
registry.add("spring.r2dbc.username", () -> "test");
registry.add("spring.r2dbc.password", () -> "test");
}
@Autowired
private UserRepository userRepository;
@Test
void testSaveAndFindUser() {
User user = new User("[email protected]", "Test User", 30);
Mono<User> savedUser = userRepository.save(user);
Mono<User> foundUser = savedUser.flatMap(u -> userRepository.findByEmail(u.getEmail()));
StepVerifier.create(foundUser)
.expectNextMatches(u ->
u.getEmail().equals("[email protected]") &&
u.getName().equals("Test User"))
.verifyComplete();
}
@Test
void testFindByAgeGreaterThan() {
User user1 = new User("[email protected]", "User One", 25);
User user2 = new User("[email protected]", "User Two", 35);
Flux<User> users = userRepository.saveAll(List.of(user1, user2))
.thenMany(userRepository.findByAgeGreaterThan(30));
StepVerifier.create(users)
.expectNextMatches(user -> user.getAge() > 30)
.verifyComplete();
}
@Test
void testTransactionRollback() {
User validUser = new User("[email protected]", "Valid User", 25);
User invalidUser = new User(null, "Invalid User", 25); // Email cannot be null
Flux<User> saveOperation = userRepository.saveAll(Flux.just(validUser, invalidUser));
StepVerifier.create(saveOperation)
.expectError()
.verify();
// Verify that no users were saved due to transaction rollback
StepVerifier.create(userRepository.count())
.expectNext(0L)
.verifyComplete();
}
@Test
void testBackpressure() {
Flux<User> users = Flux.range(1, 100)
.map(i -> new User("user" + i + "@example.com", "User " + i, 20 + i))
.flatMap(userRepository::save)
.thenMany(userRepository.findAll());
StepVerifier.create(users, 0) // Request 0 initially to test backpressure
.thenRequest(10) // Request 10 elements
.expectNextCount(10)
.thenRequest(20) // Request 20 more
.expectNextCount(20)
.thenCancel()
.verify(Duration.ofSeconds(5));
}
}
Best Practices for R2DBC
1. Connection Management:
- Use connection pooling appropriately
- Set reasonable pool sizes based on workload
- Monitor connection usage and leaks
- Implement proper connection timeout handling
2. Reactive Programming:
- Always handle errors in reactive chains
- Implement proper backpressure strategies
- Use appropriate operators for transformation
- Avoid blocking operations in reactive chains
3. Performance Optimization:
- Use prepared statements for repeated queries
- Implement proper indexing in the database
- Use batch operations for bulk inserts/updates
- Monitor and optimize query performance
4. Testing Strategies:
- Use Testcontainers for integration testing
- Implement comprehensive unit tests
- Test error scenarios and edge cases
- Verify reactive behavior and backpressure
Conclusion
R2DBC provides a powerful foundation for building reactive, non-blocking database applications in Java. Key benefits include:
- True Non-blocking I/O: No thread blocking during database operations
- Backpressure Support: Controlled data flow between producer and consumer
- Reactive Streams Integration: Seamless integration with Reactive Streams
- Functional API: Fluent, functional programming style
- Spring Integration: Excellent Spring ecosystem support
Implementation Checklist:
- ✅ Configure R2DBC connection factory and pooling
- ✅ Define reactive entity classes
- ✅ Implement reactive repositories
- ✅ Create service layer with reactive operations
- ✅ Implement proper transaction management
- ✅ Add comprehensive error handling
- ✅ Write thorough tests with Testcontainers
- ✅ Implement monitoring and metrics
R2DBC is particularly well-suited for:
- Microservices architectures with high concurrency
- Real-time applications requiring low latency
- Event-driven systems with streaming data
- High-throughput applications needing efficient resource utilization
By following these patterns and best practices, you can build scalable, efficient, and resilient reactive database applications with R2DBC.