Spring Cloud Bus in Java

Introduction

Spring Cloud Bus links nodes of a distributed system with a lightweight message broker. This allows you to broadcast state changes (like configuration changes) across all instances of your microservices. It's particularly useful for refreshing configuration across multiple service instances without restarting them.

Architecture Overview

1. Spring Cloud Bus Architecture

Config Server → Spring Cloud Bus → Message Broker → Microservice Instances
↓               ↓                 ↓                 ↓
Config Change   Broadcast Event   RabbitMQ/Kafka   Refresh Config
↓               ↓                 ↓                 ↓
Git/DB/etc.     ApplicationEvent   Message Queue   @RefreshScope

Setup and Dependencies

1. Maven Dependencies

<properties>
<spring-boot.version>3.1.0</spring-boot.version>
<spring-cloud.version>2022.0.4</spring-cloud.version>
</properties>
<dependencies>
<!-- Spring Boot Starters -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<!-- Spring Cloud Bus -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<!-- Alternative: Spring Cloud Bus with Kafka -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
<!-- Spring Cloud Config Client -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<!-- Spring Boot DevTools (for automatic restart) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<version>${spring-boot.version}</version>
<optional>true</optional>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

2. Application Configuration

# application.yml
spring:
application:
name: user-service
cloud:
bus:
enabled: true
refresh:
enabled: true
# RabbitMQ Configuration
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
# Alternatively, Kafka Configuration
# kafka:
#   bootstrap-servers: localhost:9092
#   consumer:
#     group-id: spring-cloud-bus
# Actuator Endpoints
management:
endpoints:
web:
exposure:
include: health,info,busrefresh,metrics,busenv
jmx:
exposure:
include: "*"
endpoint:
health:
show-details: always
busrefresh:
enabled: true
busenv:
enabled: true
# Logging for debugging
logging:
level:
org.springframework.cloud.bus: DEBUG
com.example.bus: DEBUG

Core Bus Implementation

1. Bus Event Configuration

@Configuration
@Slf4j
public class BusConfiguration {
@Bean
public BusBridge busBridge(ApplicationEventPublisher publisher) {
return new CustomBusBridge(publisher);
}
@Bean
public BusPropertiesCustomizer busPropertiesCustomizer() {
return properties -> {
// Customize bus properties
properties.setId("user-service-" + UUID.randomUUID().toString().substring(0, 8));
properties.getRabbit().setClaimedId(properties.getId());
};
}
@EventListener
public void onBusEvent(AckRemoteApplicationEvent event) {
log.info("Received bus acknowledgement from: {} for event: {}", 
event.getOriginService(), event.getAckId());
}
@EventListener
public void onRefreshEvent(RefreshRemoteApplicationEvent event) {
log.info("Received refresh event from: {}", event.getOriginService());
// Custom refresh logic can be added here
}
@EventListener
public void onEnvironmentChangeEvent(EnvironmentChangeRemoteApplicationEvent event) {
log.info("Environment change event received. Keys: {}", event.getValues().keySet());
// Handle specific environment changes
event.getValues().forEach((key, value) -> 
log.debug("Config changed - {}: {}", key, value));
}
}
@Component
@Slf4j
public class CustomBusBridge implements BusBridge {
private final ApplicationEventPublisher publisher;
public CustomBusBridge(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}
@Override
public void send(RemoteApplicationEvent event) {
log.info("Sending bus event: {} to destination: {}", 
event.getClass().getSimpleName(), event.getDestinationService());
// Custom logic before sending event
publisher.publishEvent(event);
}
}

2. Custom Bus Events

// Custom bus event for user-related changes
public class UserChangeEvent extends RemoteApplicationEvent {
private String userId;
private UserChangeType changeType;
private Map<String, Object> changeDetails;
// Required for deserialization
public UserChangeEvent() {
super();
}
public UserChangeEvent(Object source, String originService, 
String destinationService, String userId, 
UserChangeType changeType, Map<String, Object> changeDetails) {
super(source, originService, destinationService);
this.userId = userId;
this.changeType = changeType;
this.changeDetails = changeDetails != null ? changeDetails : new HashMap<>();
}
// Getters and setters
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public UserChangeType getChangeType() { return changeType; }
public void setChangeType(UserChangeType changeType) { this.changeType = changeType; }
public Map<String, Object> getChangeDetails() { return changeDetails; }
public void setChangeDetails(Map<String, Object> changeDetails) { 
this.changeDetails = changeDetails; 
}
public enum UserChangeType {
CREATED, UPDATED, DELETED, PROFILE_CHANGED, ROLE_CHANGED
}
}
// Event for cache invalidation
public class CacheInvalidationEvent extends RemoteApplicationEvent {
private String cacheName;
private List<String> keys;
private boolean clearAll;
public CacheInvalidationEvent() {
super();
}
public CacheInvalidationEvent(Object source, String originService, 
String destinationService, String cacheName, 
List<String> keys, boolean clearAll) {
super(source, originService, destinationService);
this.cacheName = cacheName;
this.keys = keys != null ? keys : new ArrayList<>();
this.clearAll = clearAll;
}
// Getters and setters
public String getCacheName() { return cacheName; }
public void setCacheName(String cacheName) { this.cacheName = cacheName; }
public List<String> getKeys() { return keys; }
public void setKeys(List<String> keys) { this.keys = keys; }
public boolean isClearAll() { return clearAll; }
public void setClearAll(boolean clearAll) { this.clearAll = clearAll; }
}
// Event for feature flag changes
public class FeatureFlagChangeEvent extends RemoteApplicationEvent {
private String featureName;
private boolean enabled;
private Map<String, Object> parameters;
public FeatureFlagChangeEvent() {
super();
}
public FeatureFlagChangeEvent(Object source, String originService,
String destinationService, String featureName,
boolean enabled, Map<String, Object> parameters) {
super(source, originService, destinationService);
this.featureName = featureName;
this.enabled = enabled;
this.parameters = parameters != null ? parameters : new HashMap<>();
}
// Getters and setters
public String getFeatureName() { return featureName; }
public void setFeatureName(String featureName) { this.featureName = featureName; }
public boolean isEnabled() { return enabled; }
public void setEnabled(boolean enabled) { this.enabled = enabled; }
public Map<String, Object> getParameters() { return parameters; }
public void setParameters(Map<String, Object> parameters) { this.parameters = parameters; }
}

Event Handling Services

1. Bus Event Dispatcher Service

@Service
@Slf4j
public class BusEventDispatcherService {
private final ApplicationEventPublisher eventPublisher;
private final BusProperties busProperties;
public BusEventDispatcherService(ApplicationEventPublisher eventPublisher,
BusProperties busProperties) {
this.eventPublisher = eventPublisher;
this.busProperties = busProperties;
}
/**
* Send user change event to all services
*/
public void broadcastUserChange(String userId, UserChangeEvent.UserChangeType changeType, 
Map<String, Object> changeDetails) {
UserChangeEvent event = new UserChangeEvent(
this,
busProperties.getId(),
"*", // Send to all services
userId,
changeType,
changeDetails
);
eventPublisher.publishEvent(event);
log.info("Broadcasted user change event: {} for user: {}", changeType, userId);
}
/**
* Send cache invalidation event to specific service
*/
public void sendCacheInvalidation(String destinationService, String cacheName, 
List<String> keys, boolean clearAll) {
CacheInvalidationEvent event = new CacheInvalidationEvent(
this,
busProperties.getId(),
destinationService,
cacheName,
keys,
clearAll
);
eventPublisher.publishEvent(event);
log.info("Sent cache invalidation to: {} for cache: {}", destinationService, cacheName);
}
/**
* Broadcast feature flag change to all services
*/
public void broadcastFeatureFlagChange(String featureName, boolean enabled, 
Map<String, Object> parameters) {
FeatureFlagChangeEvent event = new FeatureFlagChangeEvent(
this,
busProperties.getId(),
"*",
featureName,
enabled,
parameters
);
eventPublisher.publishEvent(event);
log.info("Broadcasted feature flag change: {} -> {}", featureName, enabled);
}
/**
* Send environment refresh event
*/
public void broadcastEnvironmentRefresh() {
RefreshRemoteApplicationEvent event = new RefreshRemoteApplicationEvent(
this,
busProperties.getId(),
"*"
);
eventPublisher.publishEvent(event);
log.info("Broadcasted environment refresh event");
}
/**
* Send specific environment change
*/
public void broadcastEnvironmentChange(Map<String, String> changes) {
EnvironmentChangeRemoteApplicationEvent event = new EnvironmentChangeRemoteApplicationEvent(
this,
busProperties.getId(),
"*",
changes
);
eventPublisher.publishEvent(event);
log.info("Broadcasted environment change event with {} keys", changes.size());
}
/**
* Send event to specific service instance
*/
public void sendToService(String serviceId, String destinationService, RemoteApplicationEvent event) {
// Custom logic for targeting specific service instances
eventPublisher.publishEvent(event);
log.info("Sent event to service: {} -> {}", serviceId, destinationService);
}
}

2. Bus Event Handler Service

@Service
@Slf4j
public class BusEventHandlerService {
private final UserService userService;
private final CacheManager cacheManager;
private final FeatureFlagService featureFlagService;
public BusEventHandlerService(UserService userService,
CacheManager cacheManager,
FeatureFlagService featureFlagService) {
this.userService = userService;
this.cacheManager = cacheManager;
this.featureFlagService = featureFlagService;
}
@EventListener
public void handleUserChangeEvent(UserChangeEvent event) {
log.info("Processing user change event: {} for user: {}", 
event.getChangeType(), event.getUserId());
try {
switch (event.getChangeType()) {
case CREATED -> handleUserCreated(event);
case UPDATED -> handleUserUpdated(event);
case DELETED -> handleUserDeleted(event);
case PROFILE_CHANGED -> handleProfileChanged(event);
case ROLE_CHANGED -> handleRoleChanged(event);
}
// Send acknowledgement
sendAcknowledgment(event);
} catch (Exception e) {
log.error("Failed to process user change event: {}", event.getUserId(), e);
}
}
@EventListener
public void handleCacheInvalidationEvent(CacheInvalidationEvent event) {
log.info("Processing cache invalidation event for cache: {}", event.getCacheName());
try {
Cache cache = cacheManager.getCache(event.getCacheName());
if (cache != null) {
if (event.isClearAll()) {
cache.clear();
log.info("Cleared entire cache: {}", event.getCacheName());
} else {
event.getKeys().forEach(key -> {
cache.evict(key);
log.debug("Evicted key: {} from cache: {}", key, event.getCacheName());
});
log.info("Evicted {} keys from cache: {}", 
event.getKeys().size(), event.getCacheName());
}
} else {
log.warn("Cache not found: {}", event.getCacheName());
}
} catch (Exception e) {
log.error("Failed to process cache invalidation event", e);
}
}
@EventListener
public void handleFeatureFlagChangeEvent(FeatureFlagChangeEvent event) {
log.info("Processing feature flag change: {} -> {}", 
event.getFeatureName(), event.isEnabled());
try {
featureFlagService.updateFeatureFlag(
event.getFeatureName(), 
event.isEnabled(), 
event.getParameters()
);
log.info("Updated feature flag: {}", event.getFeatureName());
} catch (Exception e) {
log.error("Failed to process feature flag change event", e);
}
}
@EventListener
public void handleRefreshEvent(RefreshRemoteApplicationEvent event) {
log.info("Processing refresh event from: {}", event.getOriginService());
// Refresh configuration beans
refreshConfigurationBeans();
// Clear configuration caches
clearConfigurationCaches();
log.info("Configuration refresh completed");
}
@EventListener
public void handleEnvironmentChangeEvent(EnvironmentChangeRemoteApplicationEvent event) {
log.info("Processing environment change event with {} keys", event.getValues().size());
event.getValues().forEach((key, value) -> {
log.debug("Processing environment change - {}: {}", key, value);
handleSpecificEnvironmentChange(key, value);
});
}
// Private handler methods
private void handleUserCreated(UserChangeEvent event) {
// Invalidate user caches
cacheManager.getCache("users").evict(event.getUserId());
cacheManager.getCache("user-profiles").evict(event.getUserId());
// Update search index
updateUserSearchIndex(event.getUserId());
}
private void handleUserUpdated(UserChangeEvent event) {
// Invalidate user-related caches
cacheManager.getCache("users").evict(event.getUserId());
cacheManager.getCache("user-profiles").evict(event.getUserId());
cacheManager.getCache("user-permissions").evict(event.getUserId());
// Update search index
updateUserSearchIndex(event.getUserId());
}
private void handleUserDeleted(UserChangeEvent event) {
// Remove from caches
cacheManager.getCache("users").evict(event.getUserId());
cacheManager.getCache("user-profiles").evict(event.getUserId());
cacheManager.getCache("user-permissions").evict(event.getUserId());
// Remove from search index
removeUserFromSearchIndex(event.getUserId());
}
private void handleProfileChanged(UserChangeEvent event) {
cacheManager.getCache("user-profiles").evict(event.getUserId());
updateUserSearchIndex(event.getUserId());
}
private void handleRoleChanged(UserChangeEvent event) {
cacheManager.getCache("user-permissions").evict(event.getUserId());
cacheManager.getCache("user-roles").evict(event.getUserId());
}
private void updateUserSearchIndex(String userId) {
// Implementation for updating search index
log.debug("Updating search index for user: {}", userId);
}
private void removeUserFromSearchIndex(String userId) {
// Implementation for removing from search index
log.debug("Removing user from search index: {}", userId);
}
private void refreshConfigurationBeans() {
// Refresh @RefreshScope beans
// This is handled automatically by Spring Cloud
log.debug("Refreshing configuration beans");
}
private void clearConfigurationCaches() {
// Clear any custom configuration caches
cacheManager.getCache("configuration").clear();
log.debug("Cleared configuration caches");
}
private void handleSpecificEnvironmentChange(String key, String value) {
// Handle specific environment variable changes
switch (key) {
case "app.feature.new-ui" -> 
featureFlagService.updateFeatureFlag("new-ui", Boolean.parseBoolean(value), Map.of());
case "app.cache.ttl" -> 
updateCacheTtl(Integer.parseInt(value));
case "app.rate-limit.requests" -> 
updateRateLimit(Integer.parseInt(value));
}
}
private void updateCacheTtl(int ttl) {
log.info("Updated cache TTL to: {} seconds", ttl);
}
private void updateRateLimit(int limit) {
log.info("Updated rate limit to: {} requests", limit);
}
private void sendAcknowledgment(RemoteApplicationEvent event) {
// Send acknowledgment back to origin service
// This would typically be another bus event
log.debug("Sent acknowledgment for event: {}", event.getId());
}
}

Configuration Management with Bus

1. Dynamic Configuration Service

@Service
@Slf4j
@RefreshScope
public class DynamicConfigurationService {
@Value("${app.feature.new-ui:false}")
private boolean newUiEnabled;
@Value("${app.cache.ttl:300}")
private int cacheTtl;
@Value("${app.rate-limit.requests:100}")
private int rateLimitRequests;
@Value("${app.notification.enabled:true}")
private boolean notificationEnabled;
private final BusEventDispatcherService busEventDispatcher;
private final Map<String, Object> configurationCache = new ConcurrentHashMap<>();
public DynamicConfigurationService(BusEventDispatcherService busEventDispatcher) {
this.busEventDispatcher = busEventDispatcher;
initializeConfigurationCache();
}
/**
* Get current configuration
*/
public Map<String, Object> getCurrentConfiguration() {
Map<String, Object> config = new HashMap<>();
config.put("newUiEnabled", newUiEnabled);
config.put("cacheTtl", cacheTtl);
config.put("rateLimitRequests", rateLimitRequests);
config.put("notificationEnabled", notificationEnabled);
return config;
}
/**
* Update configuration and broadcast changes
*/
public void updateConfiguration(Map<String, Object> newConfig) {
log.info("Updating configuration with {} changes", newConfig.size());
Map<String, String> environmentChanges = new HashMap<>();
// Update local configuration (in a real scenario, this would update the config server)
newConfig.forEach((key, value) -> {
String stringValue = value != null ? value.toString() : "";
environmentChanges.put(key, stringValue);
updateLocalConfiguration(key, value);
});
// Broadcast configuration changes to all services
if (!environmentChanges.isEmpty()) {
busEventDispatcher.broadcastEnvironmentChange(environmentChanges);
}
log.info("Configuration updated and broadcasted");
}
/**
* Refresh configuration from config server
*/
public void refreshConfiguration() {
log.info("Refreshing configuration from config server");
busEventDispatcher.broadcastEnvironmentRefresh();
}
/**
* Get configuration value with caching
*/
@SuppressWarnings("unchecked")
public <T> T getConfigValue(String key, T defaultValue) {
return (T) configurationCache.computeIfAbsent(key, k -> defaultValue);
}
/**
* Update configuration cache when environment changes
*/
@EventListener
public void onEnvironmentChange(EnvironmentChangeRemoteApplicationEvent event) {
log.info("Updating configuration cache from environment change event");
event.getValues().forEach((key, value) -> {
configurationCache.put(key, convertValue(key, value));
log.debug("Updated configuration cache: {} = {}", key, value);
});
}
@EventListener
public void onRefreshEvent(RefreshRemoteApplicationEvent event) {
log.info("Refreshing configuration cache");
configurationCache.clear();
initializeConfigurationCache();
}
// Private methods
private void initializeConfigurationCache() {
configurationCache.put("app.feature.new-ui", newUiEnabled);
configurationCache.put("app.cache.ttl", cacheTtl);
configurationCache.put("app.rate-limit.requests", rateLimitRequests);
configurationCache.put("app.notification.enabled", notificationEnabled);
}
private void updateLocalConfiguration(String key, Object value) {
switch (key) {
case "app.feature.new-ui" -> this.newUiEnabled = Boolean.parseBoolean(value.toString());
case "app.cache.ttl" -> this.cacheTtl = Integer.parseInt(value.toString());
case "app.rate-limit.requests" -> this.rateLimitRequests = Integer.parseInt(value.toString());
case "app.notification.enabled" -> this.notificationEnabled = Boolean.parseBoolean(value.toString());
}
configurationCache.put(key, value);
}
private Object convertValue(String key, String value) {
if (key.contains("enabled") || key.contains("feature.")) {
return Boolean.parseBoolean(value);
} else if (key.contains("ttl") || key.contains("limit") || key.contains("timeout")) {
return Integer.parseInt(value);
} else if (key.contains("interval") || key.contains("duration")) {
return Long.parseLong(value);
}
return value;
}
// Getters
public boolean isNewUiEnabled() { return newUiEnabled; }
public int getCacheTtl() { return cacheTtl; }
public int getRateLimitRequests() { return rateLimitRequests; }
public boolean isNotificationEnabled() { return notificationEnabled; }
}

REST API Controllers

1. Bus Management Controller

@RestController
@RequestMapping("/api/bus")
@Slf4j
public class BusManagementController {
private final BusEventDispatcherService busEventDispatcher;
private final DynamicConfigurationService configService;
private final BusEventHandlerService eventHandlerService;
public BusManagementController(BusEventDispatcherService busEventDispatcher,
DynamicConfigurationService configService,
BusEventHandlerService eventHandlerService) {
this.busEventDispatcher = busEventDispatcher;
this.configService = configService;
this.eventHandlerService = eventHandlerService;
}
@PostMapping("/refresh")
public ResponseEntity<Map<String, Object>> refreshConfiguration() {
log.info("Manual configuration refresh requested");
try {
busEventDispatcher.broadcastEnvironmentRefresh();
Map<String, Object> response = new HashMap<>();
response.put("status", "success");
response.put("message", "Configuration refresh broadcasted to all services");
response.put("timestamp", Instant.now());
return ResponseEntity.ok(response);
} catch (Exception e) {
log.error("Failed to broadcast refresh event", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(Map.of("status", "error", "message", e.getMessage()));
}
}
@PostMapping("/config/update")
public ResponseEntity<Map<String, Object>> updateConfiguration(
@RequestBody Map<String, Object> configUpdates) {
log.info("Configuration update requested: {}", configUpdates.keySet());
try {
configService.updateConfiguration(configUpdates);
Map<String, Object> response = new HashMap<>();
response.put("status", "success");
response.put("message", "Configuration updated and broadcasted");
response.put("updatedKeys", configUpdates.keySet());
response.put("timestamp", Instant.now());
return ResponseEntity.ok(response);
} catch (Exception e) {
log.error("Failed to update configuration", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(Map.of("status", "error", "message", e.getMessage()));
}
}
@GetMapping("/config/current")
public ResponseEntity<Map<String, Object>> getCurrentConfiguration() {
Map<String, Object> config = configService.getCurrentConfiguration();
return ResponseEntity.ok(config);
}
@PostMapping("/cache/invalidate")
public ResponseEntity<Map<String, Object>> invalidateCache(
@RequestBody CacheInvalidationRequest request) {
log.info("Cache invalidation requested: {}", request.getCacheName());
try {
busEventDispatcher.sendCacheInvalidation(
request.getDestinationService(),
request.getCacheName(),
request.getKeys(),
request.isClearAll()
);
Map<String, Object> response = new HashMap<>();
response.put("status", "success");
response.put("message", "Cache invalidation sent");
response.put("cacheName", request.getCacheName());
response.put("timestamp", Instant.now());
return ResponseEntity.ok(response);
} catch (Exception e) {
log.error("Failed to send cache invalidation", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(Map.of("status", "error", "message", e.getMessage()));
}
}
@PostMapping("/feature/update")
public ResponseEntity<Map<String, Object>> updateFeatureFlag(
@RequestBody FeatureFlagUpdateRequest request) {
log.info("Feature flag update requested: {} -> {}", 
request.getFeatureName(), request.isEnabled());
try {
busEventDispatcher.broadcastFeatureFlagChange(
request.getFeatureName(),
request.isEnabled(),
request.getParameters()
);
Map<String, Object> response = new HashMap<>();
response.put("status", "success");
response.put("message", "Feature flag update broadcasted");
response.put("featureName", request.getFeatureName());
response.put("enabled", request.isEnabled());
response.put("timestamp", Instant.now());
return ResponseEntity.ok(response);
} catch (Exception e) {
log.error("Failed to update feature flag", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(Map.of("status", "error", "message", e.getMessage()));
}
}
@PostMapping("/user/change")
public ResponseEntity<Map<String, Object>> broadcastUserChange(
@RequestBody UserChangeRequest request) {
log.info("User change broadcast requested: {} - {}", 
request.getUserId(), request.getChangeType());
try {
busEventDispatcher.broadcastUserChange(
request.getUserId(),
request.getChangeType(),
request.getChangeDetails()
);
Map<String, Object> response = new HashMap<>();
response.put("status", "success");
response.put("message", "User change broadcasted");
response.put("userId", request.getUserId());
response.put("changeType", request.getChangeType());
response.put("timestamp", Instant.now());
return ResponseEntity.ok(response);
} catch (Exception e) {
log.error("Failed to broadcast user change", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(Map.of("status", "error", "message", e.getMessage()));
}
}
@GetMapping("/health")
public ResponseEntity<Map<String, Object>> busHealth() {
Map<String, Object> health = new HashMap<>();
health.put("status", "UP");
health.put("busEnabled", true);
health.put("timestamp", Instant.now());
health.put("serviceId", "user-service"); // This would be dynamic
return ResponseEntity.ok(health);
}
}
// Request DTOs
@Data
class CacheInvalidationRequest {
private String destinationService = "*";
private String cacheName;
private List<String> keys;
private boolean clearAll = false;
}
@Data
class FeatureFlagUpdateRequest {
private String featureName;
private boolean enabled;
private Map<String, Object> parameters;
}
@Data
class UserChangeRequest {
private String userId;
private UserChangeEvent.UserChangeType changeType;
private Map<String, Object> changeDetails;
}

Monitoring and Metrics

1. Bus Metrics Service

@Service
@Slf4j
public class BusMetricsService {
private final MeterRegistry meterRegistry;
private final Counter eventSentCounter;
private final Counter eventReceivedCounter;
private final Timer eventProcessingTimer;
private final Map<String, AtomicInteger> eventTypeCounters = new ConcurrentHashMap<>();
public BusMetricsService(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
// Initialize metrics
this.eventSentCounter = Counter.builder("spring.cloud.bus.events.sent")
.description("Number of bus events sent")
.register(meterRegistry);
this.eventReceivedCounter = Counter.builder("spring.cloud.bus.events.received")
.description("Number of bus events received")
.register(meterRegistry);
this.eventProcessingTimer = Timer.builder("spring.cloud.bus.events.processing.time")
.description("Time taken to process bus events")
.register(meterRegistry);
initializeEventTypeMetrics();
}
/**
* Record event sent metric
*/
public void recordEventSent(String eventType, String destination) {
eventSentCounter.increment();
meterRegistry.counter("spring.cloud.bus.events.sent.by.type",
"event_type", eventType,
"destination", destination
).increment();
updateEventTypeCounter(eventType, "sent");
}
/**
* Record event received metric
*/
public void recordEventReceived(String eventType, String origin) {
eventReceivedCounter.increment();
meterRegistry.counter("spring.cloud.bus.events.received.by.type",
"event_type", eventType,
"origin", origin
).increment();
updateEventTypeCounter(eventType, "received");
}
/**
* Record event processing time
*/
public Timer.Sample startEventProcessingTimer() {
return Timer.start(meterRegistry);
}
public void stopEventProcessingTimer(Timer.Sample sample, String eventType, boolean success) {
sample.stop(eventProcessingTimer.tags(
"event_type", eventType,
"success", String.valueOf(success)
));
}
/**
* Get bus metrics summary
*/
public BusMetricsSummary getMetricsSummary() {
return BusMetricsSummary.builder()
.totalEventsSent(eventSentCounter.count())
.totalEventsReceived(eventReceivedCounter.count())
.eventTypeCounts(getEventTypeCounts())
.timestamp(Instant.now())
.build();
}
/**
* Event listener for bus events
*/
@EventListener
public void onBusEvent(RemoteApplicationEvent event) {
String eventType = event.getClass().getSimpleName();
recordEventReceived(eventType, event.getOriginService());
}
// Private methods
private void initializeEventTypeMetrics() {
List<String> eventTypes = List.of(
"RefreshRemoteApplicationEvent",
"EnvironmentChangeRemoteApplicationEvent",
"UserChangeEvent",
"CacheInvalidationEvent",
"FeatureFlagChangeEvent"
);
for (String eventType : eventTypes) {
eventTypeCounters.put(eventType + ".sent", new AtomicInteger(0));
eventTypeCounters.put(eventType + ".received", new AtomicInteger(0));
}
}
private void updateEventTypeCounter(String eventType, String direction) {
String counterKey = eventType + "." + direction;
eventTypeCounters.computeIfAbsent(counterKey, k -> new AtomicInteger(0))
.incrementAndGet();
}
private Map<String, Integer> getEventTypeCounts() {
return eventTypeCounters.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().get()
));
}
}
@Data
@Builder
class BusMetricsSummary {
private double totalEventsSent;
private double totalEventsReceived;
private Map<String, Integer> eventTypeCounts;
private Instant timestamp;
}

2. Bus Health Indicator

@Component
@Slf4j
public class BusHealthIndicator implements HealthIndicator {
private final BusMetricsService metricsService;
private final BusProperties busProperties;
public BusHealthIndicator(BusMetricsService metricsService,
BusProperties busProperties) {
this.metricsService = metricsService;
this.busProperties = busProperties;
}
@Override
public Health health() {
Health.Builder healthBuilder = Health.up();
try {
BusMetricsSummary metrics = metricsService.getMetricsSummary();
healthBuilder
.withDetail("busEnabled", busProperties.isEnabled())
.withDetail("totalEventsSent", metrics.getTotalEventsSent())
.withDetail("totalEventsReceived", metrics.getTotalEventsReceived())
.withDetail("eventTypes", metrics.getEventTypeCounts().keySet())
.withDetail("lastCheck", metrics.getTimestamp());
// Check if bus is functioning properly
if (metrics.getTotalEventsSent() == 0 && metrics.getTotalEventsReceived() == 0) {
healthBuilder.down().withDetail("reason", "No bus activity detected");
}
} catch (Exception e) {
healthBuilder.down(e);
}
return healthBuilder.build();
}
}

Advanced Features

1. Conditional Event Routing

@Service
@Slf4j
public class ConditionalEventRouter {
private final BusEventDispatcherService busEventDispatcher;
private final ServiceRegistry serviceRegistry;
public ConditionalEventRouter(BusEventDispatcherService busEventDispatcher,
ServiceRegistry serviceRegistry) {
this.busEventDispatcher = busEventDispatcher;
this.serviceRegistry = serviceRegistry;
}
/**
* Route event based on conditions
*/
public void routeEvent(ConditionalEventRequest request) {
log.info("Routing conditional event: {}", request.getEventType());
List<String> targetServices = determineTargetServices(request);
if (targetServices.isEmpty()) {
log.warn("No target services found for event: {}", request.getEventType());
return;
}
for (String service : targetServices) {
sendEventToService(service, request);
}
}
/**
* Route event to services matching specific criteria
*/
public void routeToMatchingServices(ServiceMatchingCriteria criteria, RemoteApplicationEvent event) {
List<String> matchingServices = serviceRegistry.findServicesMatching(criteria);
for (String service : matchingServices) {
// In a real implementation, you would send the event to the specific service
log.info("Routing event to matching service: {}", service);
}
}
private List<String> determineTargetServices(ConditionalEventRequest request) {
List<String> services = new ArrayList<>();
// Example routing logic
switch (request.getEventType()) {
case "USER_DATA_CHANGE" -> 
services.addAll(serviceRegistry.getServicesByType("user-service"));
case "ORDER_UPDATE" -> 
services.addAll(serviceRegistry.getServicesByType("order-service"));
case "CACHE_INVALIDATION" -> 
services.addAll(serviceRegistry.getAllServices());
case "CONFIG_CHANGE" -> 
services.addAll(serviceRegistry.getServicesByEnvironment(request.getEnvironment()));
}
// Apply additional filters
if (request.getRegion() != null) {
services.retainAll(serviceRegistry.getServicesByRegion(request.getRegion()));
}
return services;
}
private void sendEventToService(String service, ConditionalEventRequest request) {
// Implementation depends on the specific event type
log.debug("Sending event to service: {} - {}", service, request.getEventType());
}
}
@Data
class ConditionalEventRequest {
private String eventType;
private String environment;
private String region;
private Map<String, Object> parameters;
}
@Data
class ServiceMatchingCriteria {
private String serviceType;
private String environment;
private String region;
private String version;
private Map<String, String> labels;
}

Testing

1. Bus Integration Tests

@SpringBootTest
@ActiveProfiles("test")
@TestPropertySource(properties = {
"spring.cloud.bus.enabled=true",
"spring.rabbitmq.port=5672",
"management.endpoints.web.exposure.include=*"
})
@DirtiesContext
class SpringCloudBusIntegrationTest {
@Autowired
private BusEventDispatcherService busEventDispatcher;
@Autowired
private DynamicConfigurationService configService;
@Autowired
private ApplicationContext applicationContext;
@MockBean
private UserService userService;
@Test
void shouldBroadcastRefreshEvent() {
// Given
ApplicationEventPublisher publisher = applicationContext;
// When
busEventDispatcher.broadcastEnvironmentRefresh();
// Then - Verify that the event was published
// In a real test, you would use an event captor or verify side effects
assertTrue(true); // Placeholder assertion
}
@Test
void shouldHandleConfigurationUpdate() {
// Given
Map<String, Object> configUpdates = Map.of(
"app.feature.new-ui", true,
"app.cache.ttl", 600
);
// When
configService.updateConfiguration(configUpdates);
// Then
assertTrue(configService.isNewUiEnabled());
assertEquals(600, configService.getCacheTtl());
}
@Test
void shouldProcessUserChangeEvent() {
// Given
UserChangeEvent event = new UserChangeEvent(
this, "test-service", "*", "user123", 
UserChangeEvent.UserChangeType.UPDATED, Map.of("field", "email")
);
ApplicationEventPublisher publisher = applicationContext;
// When
publisher.publishEvent(event);
// Then - Verify that user service was called appropriately
// This would depend on your specific implementation
verify(userService, timeout(5000)).handleUserUpdate("user123");
}
}
// Test configuration
@TestConfiguration
class TestBusConfiguration {
@Bean
@Primary
public ServiceRegistry testServiceRegistry() {
return new TestServiceRegistry();
}
@Bean
@Primary
public UserService testUserService() {
return mock(UserService.class);
}
}
class TestServiceRegistry implements ServiceRegistry {
@Override
public List<String> getServicesByType(String serviceType) {
return List.of(serviceType + "-1", serviceType + "-2");
}
@Override
public List<String> getServicesByEnvironment(String environment) {
return List.of("user-service", "order-service", "payment-service");
}
@Override
public List<String> getServicesByRegion(String region) {
return List.of("user-service-" + region, "order-service-" + region);
}
@Override
public List<String> getAllServices() {
return List.of("user-service", "order-service", "payment-service", "inventory-service");
}
@Override
public List<String> findServicesMatching(ServiceMatchingCriteria criteria) {
// Simplified implementation for testing
return getServicesByType(criteria.getServiceType());
}
}

Production Configuration

1. Production-ready Application Properties

# application-prod.yml
spring:
cloud:
bus:
enabled: true
# RabbitMQ Production Configuration
rabbitmq:
host: ${RABBITMQ_HOST:rabbitmq-cluster}
port: ${RABBITMQ_PORT:5672}
username: ${RABBITMQ_USERNAME:admin}
password: ${RABBITMQ_PASSWORD:}
virtual-host: ${RABBITMQ_VHOST:/}
ssl:
enabled: true
connection-timeout: 10000
# High availability settings
addresses: ${RABBITMQ_ADDRESSES:rabbitmq-1:5672,rabbitmq-2:5672,rabbitmq-3:5672}
# Alternatively, Kafka Production Configuration
# kafka:
#   bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:kafka-1:9092,kafka-2:9092,kafka-3:9092}
#   producer:
#     acks: all
#     retries: 3
#   consumer:
#     group-id: spring-cloud-bus-${spring.application.name}
#     auto-offset-reset: latest
# Actuator Security
management:
endpoints:
web:
exposure:
include: health,info,metrics,busrefresh
base-path: /internal
endpoint:
health:
show-details: when-authorized
show-components: when-authorized
busrefresh:
enabled: true
info:
env:
enabled: true
# Security
spring:
security:
user:
name: actuator
password: ${ACTUATOR_PASSWORD:}
# Logging
logging:
level:
org.springframework.cloud.bus: INFO
com.example.bus: INFO
pattern:
level: "%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]"
# Custom Bus Properties
app:
bus:
max-retry-attempts: 3
retry-delay-ms: 1000
event-timeout-ms: 30000
enabled-events:
- REFRESH
- ENV_CHANGE
- CACHE_INVALIDATION
- USER_CHANGE

Conclusion

Spring Cloud Bus provides powerful capabilities for distributed system coordination:

  1. Configuration Management - Broadcast configuration changes across all service instances
  2. Event-Driven Architecture - Enable loose coupling between microservices
  3. Cache Coordination - Synchronize cache invalidation across instances
  4. Feature Flag Management - Dynamically control feature availability
  5. Service Coordination - Coordinate actions across multiple service instances

Key benefits for Java applications:

  • Real-time configuration updates without service restarts
  • Centralized event distribution through message brokers
  • Scalable architecture that works with hundreds of service instances
  • Flexible event routing with conditional targeting
  • Comprehensive monitoring with metrics and health checks

By implementing Spring Cloud Bus with the patterns shown above, Java microservices can achieve sophisticated coordination and real-time configuration management while maintaining loose coupling and high availability.

Java Logistics, Shipping Integration & Enterprise Inventory Automation (Tracking, ERP, RFID & Billing Systems)

https://macronepal.com/blog/aftership-tracking-in-java-enterprise-package-visibility/
Explains how to integrate AfterShip tracking services into Java applications to provide real-time shipment visibility, delivery status updates, and centralized tracking across multiple courier services.

https://macronepal.com/blog/shipping-integration-using-fedex-api-with-java-for-logistics-automation/
Explains how to integrate the FedEx API into Java systems to automate shipping tasks such as creating shipments, calculating delivery costs, generating shipping labels, and tracking packages.

https://macronepal.com/blog/shipping-and-logistics-integrating-ups-apis-with-java-applications/
Explains UPS API integration in Java to enable automated shipping operations including rate calculation, shipment scheduling, tracking, and delivery confirmation management.

https://macronepal.com/blog/generating-and-reading-qr-codes-for-products-in-java/
Explains how Java applications generate and read QR codes for product identification, tracking, and authentication, supporting faster inventory handling and product verification processes.

https://macronepal.com/blog/designing-a-robust-pick-and-pack-workflow-in-java/
Explains how to design an efficient pick-and-pack workflow in Java warehouse systems, covering order processing, item selection, packaging steps, and logistics preparation to improve fulfillment efficiency.

https://macronepal.com/blog/rfid-inventory-management-system-in-java-a-complete-guide/
Explains how RFID technology integrates with Java applications to automate inventory tracking, reduce manual errors, and enable real-time stock monitoring in warehouses and retail environments.

https://macronepal.com/blog/erp-integration-with-odoo-in-java/
Explains how Java applications connect with Odoo ERP systems to synchronize inventory, orders, customer records, and financial data across enterprise systems.

https://macronepal.com/blog/automated-invoice-generation-creating-professional-excel-invoices-with-apache-poi-in-java/
Explains how to automatically generate professional Excel invoices in Java using Apache POI, enabling structured billing documents and automated financial record creation.

https://macronepal.com/blog/enterprise-financial-integration-using-quickbooks-api-in-java-applications/
Explains QuickBooks API integration in Java to automate financial workflows such as invoice management, payment tracking, accounting synchronization, and financial reporting.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper