Middleware.io Integration in Java: Complete Guide for Real-time Data Processing

Middleware.io is a real-time data processing platform that enables seamless integration between various data sources and destinations. This guide covers comprehensive Java integration patterns.


Core Concepts

What is Middleware.io?

  • Cloud-native real-time data pipeline platform
  • Supports 140+ integrations out of the box
  • Enables event streaming, ETL, and data transformation
  • Provides both cloud and self-hosted options

Key Features:

  • Real-time data streaming
  • Data transformation and enrichment
  • Event-driven architecture support
  • Scalable data pipelines
  • Monitoring and observability

Dependencies and Setup

Maven Dependencies
<properties>
<middleware.version>1.0.0</middleware.version>
<spring-boot.version>3.1.0</spring-boot.version>
<jackson.version>2.15.2</jackson.version>
</properties>
<dependencies>
<!-- Middleware Java SDK -->
<dependency>
<groupId>io.middleware</groupId>
<artifactId>middleware-java-sdk</artifactId>
<version>${middleware.version}</version>
</dependency>
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<!-- JSON Processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- HTTP Client -->
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.2.1</version>
</dependency>
</dependencies>
Gradle Dependencies
dependencies {
implementation 'io.middleware:middleware-java-sdk:1.0.0'
implementation 'org.springframework.boot:spring-boot-starter-web:3.1.0'
implementation 'org.springframework.boot:spring-boot-starter-validation:3.1.0'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.15.2'
implementation 'org.apache.httpcomponents.client5:httpclient5:5.2.1'
}

Core Implementation

1. Middleware Configuration
@Configuration
@ConfigurationProperties(prefix = "middleware")
public class MiddlewareConfig {
private String apiKey;
private String sourceId;
private String baseUrl = "https://api.middleware.io";
private int timeout = 30000;
private int maxRetries = 3;
private boolean enabled = true;
// Getters and setters
public String getApiKey() { return apiKey; }
public void setApiKey(String apiKey) { this.apiKey = apiKey; }
public String getSourceId() { return sourceId; }
public void setSourceId(String sourceId) { this.sourceId = sourceId; }
public String getBaseUrl() { return baseUrl; }
public void setBaseUrl(String baseUrl) { this.baseUrl = baseUrl; }
public int getTimeout() { return timeout; }
public void setTimeout(int timeout) { this.timeout = timeout; }
public int getMaxRetries() { return maxRetries; }
public void setMaxRetries(int maxRetries) { this.maxRetries = maxRetries; }
public boolean isEnabled() { return enabled; }
public void setEnabled(boolean enabled) { this.enabled = enabled; }
}
2. HTTP Client Configuration
@Configuration
public class HttpClientConfig {
@Bean
public CloseableHttpClient httpClient(MiddlewareConfig config) {
return HttpClients.custom()
.setConnectionManager(new PoolingHttpClientConnectionManager())
.setDefaultRequestConfig(RequestConfig.custom()
.setConnectTimeout(config.getTimeout(), TimeUnit.MILLISECONDS)
.setConnectionRequestTimeout(config.getTimeout(), TimeUnit.MILLISECONDS)
.setSocketTimeout(config.getTimeout(), TimeUnit.MILLISECONDS)
.build())
.build();
}
@Bean
public ObjectMapper objectMapper() {
return new ObjectMapper()
.registerModule(new JavaTimeModule())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
}
}
3. Middleware Client
@Component
public class MiddlewareClient {
private final MiddlewareConfig config;
private final CloseableHttpClient httpClient;
private final ObjectMapper objectMapper;
private static final Logger logger = LoggerFactory.getLogger(MiddlewareClient.class);
private static final String TRACK_ENDPOINT = "/v1/track";
private static final String IDENTIFY_ENDPOINT = "/v1/identify";
private static final String BATCH_ENDPOINT = "/v1/batch";
public MiddlewareClient(MiddlewareConfig config, CloseableHttpClient httpClient, 
ObjectMapper objectMapper) {
this.config = config;
this.httpClient = httpClient;
this.objectMapper = objectMapper;
}
public void trackEvent(Event event) {
if (!config.isEnabled()) {
logger.debug("Middleware is disabled, skipping event tracking");
return;
}
try {
String payload = objectMapper.writeValueAsString(event);
sendRequest(TRACK_ENDPOINT, payload);
logger.debug("Event tracked successfully: {}", event.getEvent());
} catch (Exception e) {
logger.error("Failed to track event: {}", event.getEvent(), e);
throw new MiddlewareException("Event tracking failed", e);
}
}
public void identifyUser(UserIdentity identity) {
if (!config.isEnabled()) {
return;
}
try {
String payload = objectMapper.writeValueAsString(identity);
sendRequest(IDENTIFY_ENDPOINT, payload);
logger.debug("User identified: {}", identity.getUserId());
} catch (Exception e) {
logger.error("Failed to identify user: {}", identity.getUserId(), e);
throw new MiddlewareException("User identification failed", e);
}
}
public void trackBatch(List<Event> events) {
if (!config.isEnabled() || events.isEmpty()) {
return;
}
try {
BatchRequest batchRequest = new BatchRequest(events);
String payload = objectMapper.writeValueAsString(batchRequest);
sendRequest(BATCH_ENDPOINT, payload);
logger.debug("Batch of {} events tracked successfully", events.size());
} catch (Exception e) {
logger.error("Failed to track batch of {} events", events.size(), e);
throw new MiddlewareException("Batch event tracking failed", e);
}
}
private void sendRequest(String endpoint, String payload) throws IOException {
HttpPost request = new HttpPost(config.getBaseUrl() + endpoint);
request.setHeader("Content-Type", "application/json");
request.setHeader("Authorization", "Bearer " + config.getApiKey());
request.setHeader("X-Source-ID", config.getSourceId());
request.setEntity(new StringEntity(payload, StandardCharsets.UTF_8));
try (CloseableHttpResponse response = httpClient.execute(request)) {
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode < 200 || statusCode >= 300) {
String responseBody = EntityUtils.toString(response.getEntity());
throw new MiddlewareException(
String.format("HTTP %d: %s", statusCode, responseBody));
}
}
}
}
public class MiddlewareException extends RuntimeException {
public MiddlewareException(String message) {
super(message);
}
public MiddlewareException(String message, Throwable cause) {
super(message, cause);
}
}
4. Data Models
public class Event {
private String event;
private String userId;
private String anonymousId;
private Map<String, Object> properties;
private Map<String, Object> context;
private Instant timestamp;
// Constructors
public Event() {}
public Event(String event, String userId, Map<String, Object> properties) {
this.event = event;
this.userId = userId;
this.properties = properties;
this.timestamp = Instant.now();
this.context = new HashMap<>();
}
// Getters and setters
public String getEvent() { return event; }
public void setEvent(String event) { this.event = event; }
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public String getAnonymousId() { return anonymousId; }
public void setAnonymousId(String anonymousId) { this.anonymousId = anonymousId; }
public Map<String, Object> getProperties() { return properties; }
public void setProperties(Map<String, Object> properties) { this.properties = properties; }
public Map<String, Object> getContext() { return context; }
public void setContext(Map<String, Object> context) { this.context = context; }
public Instant getTimestamp() { return timestamp; }
public void setTimestamp(Instant timestamp) { this.timestamp = timestamp; }
// Fluent builder methods
public Event withProperty(String key, Object value) {
if (this.properties == null) {
this.properties = new HashMap<>();
}
this.properties.put(key, value);
return this;
}
public Event withContext(String key, Object value) {
if (this.context == null) {
this.context = new HashMap<>();
}
this.context.put(key, value);
return this;
}
}
public class UserIdentity {
private String userId;
private String anonymousId;
private Map<String, Object> traits;
private Map<String, Object> context;
private Instant timestamp;
// Constructors, getters, setters
public UserIdentity(String userId, Map<String, Object> traits) {
this.userId = userId;
this.traits = traits;
this.timestamp = Instant.now();
this.context = new HashMap<>();
}
// Getters and setters
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public String getAnonymousId() { return anonymousId; }
public void setAnonymousId(String anonymousId) { this.anonymousId = anonymousId; }
public Map<String, Object> getTraits() { return traits; }
public void setTraits(Map<String, Object> traits) { this.traits = traits; }
public Map<String, Object> getContext() { return context; }
public void setContext(Map<String, Object> context) { this.context = context; }
public Instant getTimestamp() { return timestamp; }
public void setTimestamp(Instant timestamp) { this.timestamp = timestamp; }
// Fluent builder
public UserIdentity withTrait(String key, Object value) {
if (this.traits == null) {
this.traits = new HashMap<>();
}
this.traits.put(key, value);
return this;
}
}
public class BatchRequest {
private List<Event> events;
public BatchRequest(List<Event> events) {
this.events = events;
}
// Getters and setters
public List<Event> getEvents() { return events; }
public void setEvents(List<Event> events) { this.events = events; }
}

Event Builder Pattern

1. Event Builder Factory
@Component
public class EventBuilder {
private final MiddlewareConfig config;
public EventBuilder(MiddlewareConfig config) {
this.config = config;
}
public Event buildUserEvent(String userId, String eventName, Map<String, Object> properties) {
Event event = new Event(eventName, userId, properties);
event.withContext("source", config.getSourceId())
.withContext("library", "middleware-java-sdk")
.withContext("app_version", getAppVersion());
return event;
}
public Event buildPageViewEvent(String userId, String pageName, Map<String, Object> pageProperties) {
Map<String, Object> properties = new HashMap<>();
properties.put("page_name", pageName);
properties.put("page_url", pageProperties.get("url"));
properties.put("referrer", pageProperties.get("referrer"));
return buildUserEvent(userId, "page_viewed", properties);
}
public Event buildProductViewedEvent(String userId, String productId, String productName) {
Map<String, Object> properties = Map.of(
"product_id", productId,
"product_name", productName,
"category", "electronics"
);
return buildUserEvent(userId, "product_viewed", properties);
}
public Event buildOrderCompletedEvent(String userId, String orderId, BigDecimal amount, 
List<String> productIds) {
Map<String, Object> properties = Map.of(
"order_id", orderId,
"amount", amount,
"currency", "USD",
"products", productIds
);
return buildUserEvent(userId, "order_completed", properties);
}
public Event buildCustomEvent(String userId, String eventName, String entityType, 
String entityId, Map<String, Object> customProperties) {
Map<String, Object> properties = new HashMap<>();
properties.put("entity_type", entityType);
properties.put("entity_id", entityId);
if (customProperties != null) {
properties.putAll(customProperties);
}
return buildUserEvent(userId, eventName, properties);
}
private String getAppVersion() {
try {
Package pkg = getClass().getPackage();
return pkg != null ? pkg.getImplementationVersion() : "unknown";
} catch (Exception e) {
return "unknown";
}
}
}
2. Context Enricher
@Component
public class EventEnricher {
public Event enrichWithHttpContext(Event event, HttpServletRequest request) {
return event.withContext("ip", getClientIp(request))
.withContext("user_agent", request.getHeader("User-Agent"))
.withContext("url", request.getRequestURL().toString())
.withContext("method", request.getMethod());
}
public Event enrichWithGeoData(Event event, String ipAddress) {
// This could integrate with a geoIP service
Map<String, Object> location = Map.of(
"ip", ipAddress,
"country", "US", // Would come from geoIP lookup
"city", "San Francisco"
);
return event.withContext("location", location);
}
public Event enrichWithDeviceContext(Event event, String userAgent) {
Map<String, Object> device = Map.of(
"user_agent", userAgent,
"type", parseDeviceType(userAgent),
"browser", parseBrowser(userAgent)
);
return event.withContext("device", device);
}
private String getClientIp(HttpServletRequest request) {
String xForwardedFor = request.getHeader("X-Forwarded-For");
if (xForwardedFor != null && !xForwardedFor.isEmpty()) {
return xForwardedFor.split(",")[0];
}
return request.getRemoteAddr();
}
private String parseDeviceType(String userAgent) {
if (userAgent == null) return "unknown";
userAgent = userAgent.toLowerCase();
if (userAgent.contains("mobile")) return "mobile";
if (userAgent.contains("tablet")) return "tablet";
return "desktop";
}
private String parseBrowser(String userAgent) {
if (userAgent == null) return "unknown";
userAgent = userAgent.toLowerCase();
if (userAgent.contains("chrome")) return "chrome";
if (userAgent.contains("firefox")) return "firefox";
if (userAgent.contains("safari")) return "safari";
if (userAgent.contains("edge")) return "edge";
return "unknown";
}
}

Async Event Processing

1. Async Event Dispatcher
@Component
public class AsyncEventDispatcher {
private final MiddlewareClient middlewareClient;
private final BlockingQueue<Event> eventQueue;
private final ExecutorService executorService;
private volatile boolean running = true;
private static final Logger logger = LoggerFactory.getLogger(AsyncEventDispatcher.class);
public AsyncEventDispatcher(MiddlewareClient middlewareClient) {
this.middlewareClient = middlewareClient;
this.eventQueue = new LinkedBlockingQueue<>(10000);
this.executorService = Executors.newFixedThreadPool(3, r -> {
Thread t = new Thread(r, "middleware-dispatcher");
t.setDaemon(true);
return t;
});
startConsumers();
}
public void dispatch(Event event) {
if (!eventQueue.offer(event)) {
logger.warn("Event queue is full, dropping event: {}", event.getEvent());
// Could implement fallback strategy
}
}
public void dispatchBatch(List<Event> events) {
for (Event event : events) {
dispatch(event);
}
}
private void startConsumers() {
for (int i = 0; i < 3; i++) {
executorService.submit(() -> {
while (running || !eventQueue.isEmpty()) {
try {
Event event = eventQueue.poll(1, TimeUnit.SECONDS);
if (event != null) {
processEvent(event);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
logger.error("Error processing event from queue", e);
}
}
});
}
}
private void processEvent(Event event) {
try {
middlewareClient.trackEvent(event);
} catch (Exception e) {
logger.error("Failed to send event to Middleware: {}", event.getEvent(), e);
// Implement retry logic or dead letter queue
}
}
public void shutdown() {
running = false;
executorService.shutdown();
try {
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
public int getQueueSize() {
return eventQueue.size();
}
public Map<String, Object> getMetrics() {
return Map.of(
"queue_size", eventQueue.size(),
"active_threads", ((ThreadPoolExecutor) executorService).getActiveCount(),
"completed_tasks", ((ThreadPoolExecutor) executorService).getCompletedTaskCount()
);
}
}
2. Batch Event Processor
@Component
public class BatchEventProcessor {
private final MiddlewareClient middlewareClient;
private final List<Event> batchBuffer;
private final int batchSize;
private final ScheduledExecutorService scheduler;
private static final Logger logger = LoggerFactory.getLogger(BatchEventProcessor.class);
public BatchEventProcessor(MiddlewareClient middlewareClient) {
this.middlewareClient = middlewareClient;
this.batchBuffer = Collections.synchronizedList(new ArrayList<>());
this.batchSize = 100; // Configurable
this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "batch-processor");
t.setDaemon(true);
return t;
});
startBatchFlusher();
}
public void addEvent(Event event) {
batchBuffer.add(event);
if (batchBuffer.size() >= batchSize) {
flushBatch();
}
}
private void startBatchFlusher() {
// Flush every 30 seconds
scheduler.scheduleAtFixedRate(this::flushBatch, 30, 30, TimeUnit.SECONDS);
}
private void flushBatch() {
if (batchBuffer.isEmpty()) {
return;
}
List<Event> eventsToSend;
synchronized (batchBuffer) {
eventsToSend = new ArrayList<>(batchBuffer);
batchBuffer.clear();
}
if (!eventsToSend.isEmpty()) {
try {
middlewareClient.trackBatch(eventsToSend);
logger.info("Successfully sent batch of {} events", eventsToSend.size());
} catch (Exception e) {
logger.error("Failed to send batch of {} events", eventsToSend.size(), e);
// Could implement retry logic
}
}
}
public void shutdown() {
scheduler.shutdown();
flushBatch(); // Flush remaining events
}
}

Spring Boot Integration

1. Configuration Properties
# application.yml
middleware:
api-key: ${MIDDLEWARE_API_KEY:test-key}
source-id: ${MIDDLEWARE_SOURCE_ID:java-app}
base-url: ${MIDDLEWARE_BASE_URL:https://api.middleware.io}
timeout: 30000
max-retries: 3
enabled: ${MIDDLEWARE_ENABLED:true}
# Async configuration
async:
enabled: true
queue-size: 10000
batch-size: 100
flush-interval: 30s
2. Auto-Configuration
@Configuration
@EnableConfigurationProperties(MiddlewareProperties.class)
@ConditionalOnProperty(name = "middleware.enabled", havingValue = "true")
public class MiddlewareAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public MiddlewareConfig middlewareConfig(MiddlewareProperties properties) {
MiddlewareConfig config = new MiddlewareConfig();
config.setApiKey(properties.getApiKey());
config.setSourceId(properties.getSourceId());
config.setBaseUrl(properties.getBaseUrl());
config.setTimeout(properties.getTimeout());
config.setMaxRetries(properties.getMaxRetries());
config.setEnabled(properties.isEnabled());
return config;
}
@Bean
@ConditionalOnMissingBean
public MiddlewareClient middlewareClient(MiddlewareConfig config, 
CloseableHttpClient httpClient,
ObjectMapper objectMapper) {
return new MiddlewareClient(config, httpClient, objectMapper);
}
@Bean
@ConditionalOnMissingBean
public EventBuilder eventBuilder(MiddlewareConfig config) {
return new EventBuilder(config);
}
@Bean
@ConditionalOnMissingBean
public EventEnricher eventEnricher() {
return new EventEnricher();
}
@Bean
@ConditionalOnProperty(name = "middleware.async.enabled", havingValue = "true")
public AsyncEventDispatcher asyncEventDispatcher(MiddlewareClient middlewareClient) {
return new AsyncEventDispatcher(middlewareClient);
}
@Bean
@ConditionalOnProperty(name = "middleware.async.enabled", havingValue = "true")
public BatchEventProcessor batchEventProcessor(MiddlewareClient middlewareClient) {
return new BatchEventProcessor(middlewareClient);
}
}
@ConfigurationProperties(prefix = "middleware")
public class MiddlewareProperties {
private String apiKey;
private String sourceId;
private String baseUrl = "https://api.middleware.io";
private int timeout = 30000;
private int maxRetries = 3;
private boolean enabled = true;
private Async async = new Async();
// Getters and setters
public static class Async {
private boolean enabled = true;
private int queueSize = 10000;
private int batchSize = 100;
private Duration flushInterval = Duration.ofSeconds(30);
// getters and setters
}
}
3. Service Layer Integration
@Service
@Transactional
public class UserService {
private final AsyncEventDispatcher eventDispatcher;
private final EventBuilder eventBuilder;
private final EventEnricher eventEnricher;
private static final Logger logger = LoggerFactory.getLogger(UserService.class);
public UserService(AsyncEventDispatcher eventDispatcher, EventBuilder eventBuilder,
EventEnricher eventEnricher) {
this.eventDispatcher = eventDispatcher;
this.eventBuilder = eventBuilder;
this.eventEnricher = eventEnricher;
}
public User createUser(CreateUserRequest request, HttpServletRequest httpRequest) {
logger.info("Creating user: {}", request.getEmail());
// Track user creation event
Event userCreatedEvent = eventBuilder.buildUserEvent(
null, // No user ID yet
"user_created",
Map.of(
"email", request.getEmail(),
"source", request.getSource(),
"plan", request.getPlan()
)
);
// Enrich with HTTP context
userCreatedEvent = eventEnricher.enrichWithHttpContext(userCreatedEvent, httpRequest);
eventDispatcher.dispatch(userCreatedEvent);
// Business logic
User user = userRepository.save(request.toUser());
// Track user registered event with user ID
Event userRegisteredEvent = eventBuilder.buildUserEvent(
user.getId(),
"user_registered",
Map.of(
"email", user.getEmail(),
"plan", user.getPlan(),
"registration_method", "email"
)
);
eventDispatcher.dispatch(userRegisteredEvent);
return user;
}
public void updateUserProfile(String userId, UpdateProfileRequest request) {
User user = userRepository.findById(userId)
.orElseThrow(() -> new UserNotFoundException(userId));
user.updateProfile(request);
userRepository.save(user);
// Track profile update
Event profileUpdatedEvent = eventBuilder.buildUserEvent(
userId,
"profile_updated",
Map.of(
"fields_updated", request.getUpdatedFields(),
"previous_email", user.getEmail()
)
);
eventDispatcher.dispatch(profileUpdatedEvent);
}
}
@Service
public class OrderService {
private final AsyncEventDispatcher eventDispatcher;
private final EventBuilder eventBuilder;
public OrderService(AsyncEventDispatcher eventDispatcher, EventBuilder eventBuilder) {
this.eventDispatcher = eventDispatcher;
this.eventBuilder = eventBuilder;
}
public Order createOrder(String userId, CreateOrderRequest request) {
// Track order started
Event orderStartedEvent = eventBuilder.buildUserEvent(
userId,
"order_started",
Map.of(
"cart_size", request.getItems().size(),
"total_amount", request.getTotalAmount()
)
);
eventDispatcher.dispatch(orderStartedEvent);
// Business logic
Order order = orderRepository.save(Order.fromRequest(userId, request));
// Track order completed
List<String> productIds = request.getItems().stream()
.map(OrderItem::getProductId)
.collect(Collectors.toList());
Event orderCompletedEvent = eventBuilder.buildOrderCompletedEvent(
userId,
order.getId(),
order.getTotalAmount(),
productIds
);
eventDispatcher.dispatch(orderCompletedEvent);
return order;
}
public void cancelOrder(String userId, String orderId) {
Order order = orderRepository.findByIdAndUserId(orderId, userId)
.orElseThrow(() -> new OrderNotFoundException(orderId));
order.cancel();
orderRepository.save(order);
// Track order cancellation
Event orderCancelledEvent = eventBuilder.buildUserEvent(
userId,
"order_cancelled",
Map.of(
"order_id", orderId,
"reason", "user_requested",
"refund_amount", order.getTotalAmount()
)
);
eventDispatcher.dispatch(orderCancelledEvent);
}
}
4. REST Controller Integration
@RestController
@RequestMapping("/api/events")
public class EventController {
private final AsyncEventDispatcher eventDispatcher;
private final EventBuilder eventBuilder;
private final EventEnricher eventEnricher;
public EventController(AsyncEventDispatcher eventDispatcher, EventBuilder eventBuilder,
EventEnricher eventEnricher) {
this.eventDispatcher = eventDispatcher;
this.eventBuilder = eventBuilder;
this.eventEnricher = eventEnricher;
}
@PostMapping("/track")
public ResponseEntity<Void> trackEvent(@RequestBody TrackEventRequest request,
HttpServletRequest httpRequest) {
Event event = eventBuilder.buildUserEvent(
request.getUserId(),
request.getEventName(),
request.getProperties()
);
// Enrich with context
event = eventEnricher.enrichWithHttpContext(event, httpRequest);
eventDispatcher.dispatch(event);
return ResponseEntity.accepted().build();
}
@PostMapping("/page-view")
public ResponseEntity<Void> trackPageView(@RequestBody PageViewRequest request,
HttpServletRequest httpRequest) {
Event event = eventBuilder.buildPageViewEvent(
request.getUserId(),
request.getPageName(),
Map.of(
"url", request.getUrl(),
"referrer", request.getReferrer()
)
);
event = eventEnricher.enrichWithHttpContext(event, httpRequest);
eventDispatcher.dispatch(event);
return ResponseEntity.accepted().build();
}
@PostMapping("/identify")
public ResponseEntity<Void> identifyUser(@RequestBody IdentifyUserRequest request) {
UserIdentity identity = new UserIdentity(
request.getUserId(),
request.getTraits()
);
// In real implementation, you'd have a method for this in MiddlewareClient
// For now, we'll create an event
Event identifyEvent = new Event("$identify", request.getUserId(), request.getTraits());
eventDispatcher.dispatch(identifyEvent);
return ResponseEntity.accepted().build();
}
}
// DTOs for the controller
public class TrackEventRequest {
@NotBlank
private String userId;
@NotBlank
private String eventName;
private Map<String, Object> properties;
// Getters and setters
}
public class PageViewRequest {
private String userId;
@NotBlank
private String pageName;
@NotBlank
private String url;
private String referrer;
// Getters and setters
}
public class IdentifyUserRequest {
@NotBlank
private String userId;
private Map<String, Object> traits;
// Getters and setters
}
5. Spring AOP for Automatic Event Tracking
@Aspect
@Component
public class EventTrackingAspect {
private final AsyncEventDispatcher eventDispatcher;
private final EventBuilder eventBuilder;
public EventTrackingAspect(AsyncEventDispatcher eventDispatcher, EventBuilder eventBuilder) {
this.eventDispatcher = eventDispatcher;
this.eventBuilder = eventBuilder;
}
@AfterReturning(pointcut = "@annotation(trackEvent)", returning = "result")
public void trackEvent(JoinPoint joinPoint, TrackEvent trackEvent, Object result) {
try {
String eventName = trackEvent.value();
Map<String, Object> properties = extractProperties(joinPoint, result, trackEvent);
String userId = extractUserId(joinPoint, trackEvent.userIdExpression());
Event event = eventBuilder.buildUserEvent(userId, eventName, properties);
eventDispatcher.dispatch(event);
} catch (Exception e) {
// Don't let event tracking break the main business logic
LoggerFactory.getLogger(joinPoint.getTarget().getClass())
.warn("Failed to track event: {}", trackEvent.value(), e);
}
}
private Map<String, Object> extractProperties(JoinPoint joinPoint, Object result, 
TrackEvent trackEvent) {
Map<String, Object> properties = new HashMap<>();
// Extract method arguments
Object[] args = joinPoint.getArgs();
String[] argNames = trackEvent.includeArgs();
if (argNames.length > 0) {
for (String argName : argNames) {
// Simple extraction - in real implementation, use more sophisticated approach
properties.put(argName, args.length > 0 ? args[0] : null);
}
}
// Include return value if specified
if (trackEvent.includeReturnValue() && result != null) {
properties.put("return_value", result.toString());
}
return properties;
}
private String extractUserId(JoinPoint joinPoint, String userIdExpression) {
if (StringUtils.hasText(userIdExpression)) {
// Use Spring Expression Language to extract user ID
// This is a simplified version
return evaluateExpression(joinPoint, userIdExpression);
}
return null;
}
private String evaluateExpression(JoinPoint joinPoint, String expression) {
// Simplified - use Spring EL in real implementation
return "user-" + System.currentTimeMillis();
}
}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface TrackEvent {
String value();
String[] includeArgs() default {};
boolean includeReturnValue() default false;
String userIdExpression() default "";
}
// Usage in service
@Service
public class ProductService {
@TrackEvent(value = "product_viewed", includeArgs = {"productId"}, userIdExpression = "#userId")
public Product viewProduct(String userId, String productId) {
return productRepository.findById(productId)
.orElseThrow(() -> new ProductNotFoundException(productId));
}
@TrackEvent(value = "product_purchased", includeArgs = {"productId", "quantity"}, 
includeReturnValue = true, userIdExpression = "#userId")
public OrderItem purchaseProduct(String userId, String productId, int quantity) {
// Business logic
return orderItemRepository.save(new OrderItem(productId, quantity));
}
}

Monitoring and Health Checks

1. Health Indicator
@Component
public class MiddlewareHealthIndicator implements HealthIndicator {
private final MiddlewareConfig config;
private final AsyncEventDispatcher asyncDispatcher;
public MiddlewareHealthIndicator(MiddlewareConfig config, AsyncEventDispatcher asyncDispatcher) {
this.config = config;
this.asyncDispatcher = asyncDispatcher;
}
@Override
public Health health() {
if (!config.isEnabled()) {
return Health.outOfService()
.withDetail("message", "Middleware is disabled")
.build();
}
try {
Map<String, Object> metrics = asyncDispatcher.getMetrics();
int queueSize = (Integer) metrics.get("queue_size");
Health.Builder healthBuilder = Health.up()
.withDetail("queue_size", queueSize)
.withDetail("active_threads", metrics.get("active_threads"));
if (queueSize > 5000) {
healthBuilder.withDetail("warning", "Event queue is getting large");
}
return healthBuilder.build();
} catch (Exception e) {
return Health.down(e)
.withDetail("error", "Middleware health check failed")
.build();
}
}
}
2. Metrics Collector
@Component
public class MiddlewareMetrics {
private final MeterRegistry meterRegistry;
private final Counter eventsSent;
private final Counter eventsFailed;
private final Gauge queueSize;
public MiddlewareMetrics(MeterRegistry meterRegistry, AsyncEventDispatcher asyncDispatcher) {
this.meterRegistry = meterRegistry;
this.eventsSent = Counter.builder("middleware.events.sent")
.description("Number of events sent to Middleware")
.register(meterRegistry);
this.eventsFailed = Counter.builder("middleware.events.failed")
.description("Number of events that failed to send")
.register(meterRegistry);
this.queueSize = Gauge.builder("middleware.queue.size")
.description("Current size of the event queue")
.register(meterRegistry, asyncDispatcher, AsyncEventDispatcher::getQueueSize);
}
public void incrementEventsSent() {
eventsSent.increment();
}
public void incrementEventsFailed() {
eventsFailed.increment();
}
}

Testing

1. Unit Tests
@ExtendWith(MockitoExtension.class)
class MiddlewareClientTest {
@Mock
private CloseableHttpClient httpClient;
@Mock
private CloseableHttpResponse httpResponse;
@Mock
private StatusLine statusLine;
private MiddlewareClient middlewareClient;
private MiddlewareConfig config;
@BeforeEach
void setUp() {
config = new MiddlewareConfig();
config.setApiKey("test-key");
config.setSourceId("test-source");
config.setBaseUrl("https://api.middleware.io");
ObjectMapper objectMapper = new ObjectMapper();
middlewareClient = new MiddlewareClient(config, httpClient, objectMapper);
}
@Test
void shouldTrackEventSuccessfully() throws Exception {
// Given
Event event = new Event("test_event", "user-123", Map.of("test", "value"));
when(httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse);
when(httpResponse.getStatusLine()).thenReturn(statusLine);
when(statusLine.getStatusCode()).thenReturn(200);
// When
middlewareClient.trackEvent(event);
// Then
verify(httpClient).execute(any(HttpPost.class));
}
}
@SpringBootTest
@TestPropertySource(properties = {
"middleware.api-key=test-key",
"middleware.source-id=test-source",
"middleware.enabled=true"
})
class MiddlewareIntegrationTest {
@Autowired
private AsyncEventDispatcher eventDispatcher;
@Test
void shouldDispatchEventAsync() {
// Given
Event event = new Event("integration_test", "user-123", Map.of("test", "value"));
// When
eventDispatcher.dispatch(event);
// Then - wait for async processing
await().atMost(5, TimeUnit.SECONDS)
.until(() -> eventDispatcher.getQueueSize() == 0);
}
}

Best Practices

  1. Error Handling: Never let event tracking break main business logic
  2. Performance: Use async processing for high-volume events
  3. Security: Don't send sensitive data in events
  4. Monitoring: Track queue sizes and error rates
  5. Retry Logic: Implement retry mechanisms for failed events
  6. Batching: Use batch processing for better performance
// Example of safe event tracking with validation
@Component
public class SafeEventTracker {
private final AsyncEventDispatcher eventDispatcher;
private final Set<String> sensitiveFields = Set.of("password", "token", "credit_card");
public void trackSafeEvent(Event event) {
Event sanitizedEvent = sanitizeEvent(event);
eventDispatcher.dispatch(sanitizedEvent);
}
private Event sanitizeEvent(Event event) {
if (event.getProperties() != null) {
Map<String, Object> sanitizedProperties = event.getProperties().entrySet().stream()
.filter(entry -> !isSensitiveField(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
event.setProperties(sanitizedProperties);
}
return event;
}
private boolean isSensitiveField(String fieldName) {
return sensitiveFields.stream()
.anyMatch(sensitive -> fieldName.toLowerCase().contains(sensitive));
}
}

Conclusion

Middleware.io integration in Java provides:

  • Real-time event tracking for user actions and system events
  • Scalable async processing with configurable batching
  • Rich context enrichment for better analytics
  • Spring Boot integration for easy setup
  • Comprehensive monitoring and health checks

By implementing the patterns shown above, you can build robust, production-ready event tracking systems that scale with your application while maintaining performance and reliability.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper