Functional endpoints in Spring WebFlux provide an alternative to annotation-based controllers using a functional programming style. They are particularly useful for reactive programming and offer more control over request handling.
1. Basic Setup
Dependencies (Maven)
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId> </dependency> </dependencies>
Main Application Class
@SpringBootApplication
public class WebFluxFunctionalApplication {
public static void main(String[] args) {
SpringApplication.run(WebFluxFunctionalApplication.class, args);
}
@Bean
public RouterFunction<ServerResponse> routes(UserHandler userHandler) {
return userHandler.routes();
}
}
2. Core Components
HandlerFunction
@FunctionalInterface
public interface HandlerFunction<T extends ServerResponse> {
Mono<T> handle(ServerRequest request);
}
RouterFunction
@FunctionalInterface
public interface RouterFunction<T extends ServerResponse> {
Mono<HandlerFunction<T>> route(ServerRequest request);
}
3. Basic Functional Endpoint
Simple Handler
@Component
public class HelloHandler {
public RouterFunction<ServerResponse> routes() {
return RouterFunctions.route()
.GET("/hello", this::hello)
.GET("/hello/{name}", this::helloName)
.POST("/hello", this::createHello)
.build();
}
public Mono<ServerResponse> hello(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(Map.of("message", "Hello World!"));
}
public Mono<ServerResponse> helloName(ServerRequest request) {
String name = request.pathVariable("name");
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(Map.of("message", "Hello " + name + "!"));
}
public Mono<ServerResponse> createHello(ServerRequest request) {
return request.bodyToMono(Map.class)
.flatMap(body -> {
String message = (String) body.get("message");
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(Map.of("received", message));
});
}
}
4. CRUD Operations with Reactive Repository
Entity Class
@Data
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "users")
public class User {
@Id
private String id;
private String name;
private String email;
private int age;
private LocalDateTime createdAt;
}
Reactive Repository
public interface UserRepository extends ReactiveMongoRepository<User, String> {
Flux<User> findByName(String name);
Mono<User> findByEmail(String email);
}
User Handler with Full CRUD
@Component
public class UserHandler {
private final UserRepository userRepository;
private final Validator validator;
public UserHandler(UserRepository userRepository, Validator validator) {
this.userRepository = userRepository;
this.validator = validator;
}
public RouterFunction<ServerResponse> routes() {
return RouterFunctions.route()
.GET("/users", this::getAllUsers)
.GET("/users/{id}", this::getUserById)
.GET("/users/name/{name}", this::getUsersByName)
.POST("/users", this::createUser)
.PUT("/users/{id}", this::updateUser)
.DELETE("/users/{id}", this::deleteUser)
.filter(this::errorHandlingFilter)
.build();
}
// GET all users
public Mono<ServerResponse> getAllUsers(ServerRequest request) {
Flux<User> users = userRepository.findAll();
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(users, User.class);
}
// GET user by ID
public Mono<ServerResponse> getUserById(ServerRequest request) {
String id = request.pathVariable("id");
return userRepository.findById(id)
.flatMap(user -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build());
}
// GET users by name
public Mono<ServerResponse> getUsersByName(ServerRequest request) {
String name = request.pathVariable("name");
Flux<User> users = userRepository.findByName(name);
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(users, User.class);
}
// POST create user
public Mono<ServerResponse> createUser(ServerRequest request) {
return request.bodyToMono(User.class)
.doOnNext(this::validate)
.flatMap(user -> {
user.setId(null); // Ensure new ID is generated
user.setCreatedAt(LocalDateTime.now());
return userRepository.save(user);
})
.flatMap(savedUser -> ServerResponse
.created(URI.create("/users/" + savedUser.getId()))
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(savedUser))
.onErrorResume(ValidationException.class, e ->
ServerResponse.badRequest()
.bodyValue(Map.of("error", e.getMessage())));
}
// PUT update user
public Mono<ServerResponse> updateUser(ServerRequest request) {
String id = request.pathVariable("id");
return userRepository.findById(id)
.zipWith(request.bodyToMono(User.class))
.flatMap(tuple -> {
User existingUser = tuple.getT1();
User updatedUser = tuple.getT2();
existingUser.setName(updatedUser.getName());
existingUser.setEmail(updatedUser.getEmail());
existingUser.setAge(updatedUser.getAge());
return userRepository.save(existingUser);
})
.flatMap(savedUser -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(savedUser))
.switchIfEmpty(ServerResponse.notFound().build());
}
// DELETE user
public Mono<ServerResponse> deleteUser(ServerRequest request) {
String id = request.pathVariable("id");
return userRepository.existsById(id)
.flatMap(exists -> {
if (exists) {
return userRepository.deleteById(id)
.then(ServerResponse.noContent().build());
} else {
return ServerResponse.notFound().build();
}
});
}
private void validate(User user) {
Set<ConstraintViolation<User>> violations = validator.validate(user);
if (!violations.isEmpty()) {
String message = violations.stream()
.map(ConstraintViolation::getMessage)
.collect(Collectors.joining(", "));
throw new ValidationException(message);
}
}
// Error handling filter
private Mono<ServerResponse> errorHandlingFilter(ServerRequest request,
HandlerFunction<ServerResponse> next) {
return next.handle(request)
.onErrorResume(ValidationException.class, e ->
ServerResponse.badRequest()
.bodyValue(Map.of("error", e.getMessage())))
.onErrorResume(DataAccessException.class, e ->
ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.bodyValue(Map.of("error", "Database error")))
.onErrorResume(Exception.class, e ->
ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.bodyValue(Map.of("error", "Internal server error")));
}
}
5. Advanced Routing with Nested Routes
Nested Router Configuration
@Component
public class ApiRouter {
private final UserHandler userHandler;
private final ProductHandler productHandler;
public ApiRouter(UserHandler userHandler, ProductHandler productHandler) {
this.userHandler = userHandler;
this.productHandler = productHandler;
}
@Bean
public RouterFunction<ServerResponse> apiRoutes() {
return RouterFunctions.route()
.path("/api", builder -> builder
.nest(accept(MediaType.APPLICATION_JSON), nestedBuilder -> nestedBuilder
.path("/v1", this::v1Routes)
.path("/v2", this::v2Routes)
)
)
.build();
}
private RouterFunction<ServerResponse> v1Routes() {
return RouterFunctions.route()
.path("/users", () -> userHandler.routes())
.path("/products", () -> productHandler.routes())
.GET("/health", request -> ServerResponse.ok().bodyValue("API v1 is healthy"))
.build();
}
private RouterFunction<ServerResponse> v2Routes() {
return RouterFunctions.route()
.path("/users", () -> userHandler.routes())
.path("/products", () -> productHandler.routes())
.GET("/health", request -> ServerResponse.ok().bodyValue("API v2 is healthy"))
.add(userHandler.advancedRoutes()) // Additional v2 specific routes
.build();
}
}
6. Request Predicates and Filters
Custom Request Predicates
@Component
public class SecurityHandler {
public RouterFunction<ServerResponse> securedRoutes() {
return RouterFunctions.route()
.GET("/admin/users",
RequestPredicates.GET("/admin/users")
.and(RequestPredicates.accept(MediaType.APPLICATION_JSON))
.and(this::isAdmin),
this::getAdminUsers)
.GET("/profile",
RequestPredicates.GET("/profile").and(this::isAuthenticated),
this::getUserProfile)
.build();
}
private boolean isAdmin(ServerRequest request) {
// Check admin role from headers or tokens
String role = request.headers().firstHeader("X-User-Role");
return "admin".equals(role);
}
private boolean isAuthenticated(ServerRequest request) {
String authHeader = request.headers().firstHeader("Authorization");
return authHeader != null && authHeader.startsWith("Bearer ");
}
public Mono<ServerResponse> getAdminUsers(ServerRequest request) {
// Return admin-only user data
return ServerResponse.ok().bodyValue(Map.of("message", "Admin users data"));
}
public Mono<ServerResponse> getUserProfile(ServerRequest request) {
String userId = extractUserId(request);
return ServerResponse.ok().bodyValue(Map.of("userId", userId, "profile", "user data"));
}
private String extractUserId(ServerRequest request) {
// Extract user ID from token
return "user123";
}
}
Advanced Filtering
@Component
public class LoggingFilter {
public HandlerFilterFunction<ServerResponse, ServerResponse> logging() {
return (request, next) -> {
long startTime = System.currentTimeMillis();
String path = request.path();
String method = request.methodName();
System.out.println("Started: " + method + " " + path);
return next.handle(request)
.doOnSuccess(response -> {
long duration = System.currentTimeMillis() - startTime;
System.out.println("Completed: " + method + " " + path +
" - Status: " + response.statusCode() +
" - Duration: " + duration + "ms");
})
.doOnError(error -> {
long duration = System.currentTimeMillis() - startTime;
System.out.println("Failed: " + method + " " + path +
" - Error: " + error.getMessage() +
" - Duration: " + duration + "ms");
});
};
}
public HandlerFilterFunction<ServerResponse, ServerResponse> rateLimiting() {
return (request, next) -> {
String clientIp = getClientIp(request);
// Implement rate limiting logic here
if (isRateLimited(clientIp)) {
return ServerResponse.status(HttpStatus.TOO_MANY_REQUESTS)
.bodyValue(Map.of("error", "Rate limit exceeded"));
}
return next.handle(request);
};
}
private String getClientIp(ServerRequest request) {
// Extract client IP from request
return request.remoteAddress()
.map(addr -> addr.getAddress().getHostAddress())
.orElse("unknown");
}
private boolean isRateLimited(String clientIp) {
// Implement rate limiting logic
return false;
}
}
7. Streaming and SSE (Server-Sent Events)
Streaming Handler
@Component
public class StreamHandler {
public RouterFunction<ServerResponse> streamRoutes() {
return RouterFunctions.route()
.GET("/stream/numbers", this::streamNumbers)
.GET("/stream/users", this::streamUsers)
.GET("/events", this::serverSentEvents)
.build();
}
public Mono<ServerResponse> streamNumbers(ServerRequest request) {
Flux<Long> numberStream = Flux.interval(Duration.ofSeconds(1))
.take(10); // Emit 10 numbers with 1-second interval
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_NDJSON)
.body(numberStream, Long.class);
}
public Mono<ServerResponse> streamUsers(ServerRequest request) {
Flux<User> userStream = Flux.interval(Duration.ofSeconds(2))
.map(i -> new User(
"id" + i,
"User " + i,
"user" + i + "@example.com",
20 + i.intValue(),
LocalDateTime.now()
))
.take(5);
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_NDJSON)
.body(userStream, User.class);
}
public Mono<ServerResponse> serverSentEvents(ServerRequest request) {
Flux<ServerSentEvent<Object>> eventStream = Flux.interval(Duration.ofSeconds(1))
.map(sequence -> ServerSentEvent.builder()
.id(String.valueOf(sequence))
.event("message")
.data("Event " + sequence + " at " + LocalDateTime.now())
.build())
.take(10);
return ServerResponse.ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(eventStream, ServerSentEvent.class);
}
}
8. File Upload and Download
File Handler
@Component
public class FileHandler {
private final Path uploadDir = Paths.get("uploads");
public FileHandler() {
try {
Files.createDirectories(uploadDir);
} catch (IOException e) {
throw new RuntimeException("Could not create upload directory", e);
}
}
public RouterFunction<ServerResponse> fileRoutes() {
return RouterFunctions.route()
.POST("/upload", this::uploadFile)
.GET("/download/{filename}", this::downloadFile)
.GET("/files", this::listFiles)
.build();
}
public Mono<ServerResponse> uploadFile(ServerRequest request) {
return request.multipartData()
.flatMap(parts -> {
FilePart filePart = (FilePart) parts.toSingleValueMap().get("file");
if (filePart == null) {
return Mono.error(new IllegalArgumentException("File not provided"));
}
Path filePath = uploadDir.resolve(filePart.filename());
return filePart.transferTo(filePath)
.then(ServerResponse.ok()
.bodyValue(Map.of(
"message", "File uploaded successfully",
"filename", filePart.filename(),
"size", filePart.headers().getContentLength()
)));
});
}
public Mono<ServerResponse> downloadFile(ServerRequest request) {
String filename = request.pathVariable("filename");
Path filePath = uploadDir.resolve(filename);
if (!Files.exists(filePath)) {
return ServerResponse.notFound().build();
}
Resource resource = new FileSystemResource(filePath);
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_OCTET_STREAM)
.header(HttpHeaders.CONTENT_DISPOSITION,
"attachment; filename=\"" + filename + "\"")
.bodyValue(resource);
}
public Mono<ServerResponse> listFiles(ServerRequest request) {
try {
Flux<String> files = Flux.fromStream(Files.list(uploadDir))
.map(path -> path.getFileName().toString());
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(files, String.class);
} catch (IOException e) {
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.bodyValue(Map.of("error", "Could not list files"));
}
}
}
9. Testing Functional Endpoints
Test Dependencies
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency>
WebTestClient Tests
@SpringBootTest
@AutoConfigureWebTestClient
class UserHandlerTest {
@Autowired
private WebTestClient webTestClient;
@MockBean
private UserRepository userRepository;
@Test
void shouldGetAllUsers() {
User user1 = new User("1", "John", "[email protected]", 25, LocalDateTime.now());
User user2 = new User("2", "Jane", "[email protected]", 30, LocalDateTime.now());
when(userRepository.findAll()).thenReturn(Flux.just(user1, user2));
webTestClient.get().uri("/users")
.exchange()
.expectStatus().isOk()
.expectBodyList(User.class)
.hasSize(2)
.contains(user1, user2);
}
@Test
void shouldCreateUser() {
User newUser = new User(null, "John", "[email protected]", 25, null);
User savedUser = new User("1", "John", "[email protected]", 25, LocalDateTime.now());
when(userRepository.save(any(User.class))).thenReturn(Mono.just(savedUser));
webTestClient.post().uri("/users")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(newUser)
.exchange()
.expectStatus().isCreated()
.expectBody(User.class)
.isEqualTo(savedUser);
}
@Test
void shouldReturnNotFoundForNonExistentUser() {
when(userRepository.findById("999")).thenReturn(Mono.empty());
webTestClient.get().uri("/users/999")
.exchange()
.expectStatus().isNotFound();
}
}
10. Configuration and Best Practices
Application Configuration
# application.yml spring: data: mongodb: uri: mongodb://localhost:27017/webflux webflux: base-path: /api server: port: 8080
Best Practices
- Keep handlers focused and single-responsibility
- Use dependency injection for handlers
- Implement proper error handling
- Use reactive types (Mono/Flux) consistently
- Test with WebTestClient
- Use filters for cross-cutting concerns
- Implement proper validation
- Use nested routes for API versioning
- Implement proper logging and monitoring
- Use ServerSentEvent for real-time updates
Functional endpoints in WebFlux provide a powerful, flexible way to build reactive web applications with fine-grained control over request handling and routing.