JFR Event Streaming to Kafka in Java

Overview

Java Flight Recorder (JFR) Event Streaming allows real-time monitoring of JVM events by streaming them directly to external systems like Kafka. This enables centralized monitoring, analysis, and alerting for Java applications.

Key Components

  • JFR Event Streaming API (JDK 14+)
  • Kafka Producer API
  • Custom Event Processing
  • Configuration Management

Basic Setup

1. Dependencies

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.15.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
</dependencies>

2. Basic JFR to Kafka Streamer

import jdk.jfr.consumer.*;
import org.apache.kafka.clients.producer.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.file.Path;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
public class BasicJfrKafkaStreamer {
private final KafkaProducer<String, String> kafkaProducer;
private final String topic;
private final ObjectMapper objectMapper;
private volatile boolean running = true;
public BasicJfrKafkaStreamer(String bootstrapServers, String topic) {
this.topic = topic;
this.objectMapper = new ObjectMapper();
// Kafka producer configuration
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.ACKS_CONFIG, "1");
this.kafkaProducer = new KafkaProducer<>(props);
}
public void startStreaming() {
try {
// Create a recording that streams events
RecordingStream recordingStream = new RecordingStream();
// Enable common event types
recordingStream.enable("jdk.CPULoad").withPeriod(Duration.ofSeconds(1));
recordingStream.enable("jdk.GarbageCollection").withPeriod(Duration.ofSeconds(5));
recordingStream.enable("jdk.JavaMonitorEnter").withThreshold(Duration.ofMillis(10));
recordingStream.enable("jdk.ThreadSleep").withThreshold(Duration.ofSeconds(1));
// Subscribe to events
recordingStream.onEvent(this::handleJfrEvent);
// Start the stream
recordingStream.start();
} catch (Exception e) {
System.err.println("Failed to start JFR streaming: " + e.getMessage());
}
}
private void handleJfrEvent(RecordedEvent event) {
try {
String eventName = event.getEventType().getName();
Map<String, Object> eventData = parseEventData(event);
// Create Kafka message
String message = objectMapper.writeValueAsString(eventData);
String key = eventName + "-" + System.currentTimeMillis();
// Send to Kafka
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, message);
kafkaProducer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Failed to send event to Kafka: " + exception.getMessage());
}
});
} catch (Exception e) {
System.err.println("Error processing JFR event: " + e.getMessage());
}
}
private Map<String, Object> parseEventData(RecordedEvent event) {
Map<String, Object> data = new LinkedHashMap<>();
data.put("eventType", event.getEventType().getName());
data.put("startTime", event.getStartTime().toString());
data.put("duration", event.getDuration().toNanos());
// Add event-specific fields
event.getFields().forEach(field -> {
try {
Object value = event.getValue(field.getName());
data.put(field.getName(), value != null ? value.toString() : null);
} catch (Exception e) {
data.put(field.getName(), "ERROR: " + e.getMessage());
}
});
return data;
}
public void stop() {
running = false;
kafkaProducer.close(Duration.ofSeconds(5));
}
}

Advanced Implementation

1. Configurable JFR Kafka Streamer

import jdk.jfr.Configuration;
import jdk.jfr.FlightRecorder;
import jdk.jfr.consumer.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class ConfigurableJfrKafkaStreamer {
private final KafkaProducer<String, String> kafkaProducer;
private final JfrStreamerConfig config;
private final ObjectMapper objectMapper;
private final ExecutorService executor;
private volatile boolean running = false;
private RecordingStream recordingStream;
public ConfigurableJfrKafkaStreamer(JfrStreamerConfig config) {
this.config = config;
this.objectMapper = new ObjectMapper();
this.executor = Executors.newSingleThreadExecutor();
Properties props = new Properties();
props.putAll(config.getKafkaProperties());
this.kafkaProducer = new KafkaProducer<>(props);
}
public void start() {
if (running) {
throw new IllegalStateException("Streamer is already running");
}
running = true;
executor.submit(this::streamEvents);
}
private void streamEvents() {
try {
recordingStream = new RecordingStream();
// Configure event settings
configureEvents(recordingStream);
// Set event handlers
recordingStream.onEvent(this::processEvent);
recordingStream.onFlush(this::handleFlush);
recordingStream.onClose(this::handleClose);
// Start streaming
recordingStream.start();
} catch (Exception e) {
System.err.println("JFR streaming failed: " + e.getMessage());
stop();
}
}
private void configureEvents(RecordingStream stream) {
config.getEventConfigs().forEach((eventName, eventConfig) -> {
try {
EventSettings settings = stream.enable(eventName);
if (eventConfig.getPeriod() != null) {
settings.withPeriod(eventConfig.getPeriod());
}
if (eventConfig.getThreshold() != null) {
settings.withThreshold(eventConfig.getThreshold());
}
if (eventConfig.getStackTrace() != null) {
settings.withStackTrace(eventConfig.getStackTrace());
}
} catch (Exception e) {
System.err.println("Failed to configure event: " + eventName + " - " + e.getMessage());
}
});
}
private void processEvent(RecordedEvent event) {
if (!running) return;
try {
JfrEventMessage message = new JfrEventMessage(event);
String jsonMessage = objectMapper.writeValueAsString(message);
String key = generateKey(event);
ProducerRecord<String, String> record = 
new ProducerRecord<>(config.getTopic(), key, jsonMessage);
kafkaProducer.send(record, new JfrEventCallback(event.getEventType().getName()));
} catch (Exception e) {
System.err.println("Failed to process JFR event: " + e.getMessage());
}
}
private String generateKey(RecordedEvent event) {
return String.format("%s-%d-%d", 
event.getEventType().getName(),
event.getStartTime().toEpochMilli(),
ThreadLocalRandom.current().nextInt(1000));
}
private void handleFlush() {
System.out.println("JFR stream flushed");
kafkaProducer.flush();
}
private void handleClose() {
System.out.println("JFR stream closed");
stop();
}
public void stop() {
running = false;
if (recordingStream != null) {
recordingStream.close();
}
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
kafkaProducer.close(Duration.ofSeconds(5));
}
// Configuration classes
public static class JfrStreamerConfig {
private String topic;
private Map<String, EventConfig> eventConfigs = new HashMap<>();
private Properties kafkaProperties = new Properties();
// Getters and setters
public String getTopic() { return topic; }
public void setTopic(String topic) { this.topic = topic; }
public Map<String, EventConfig> getEventConfigs() { return eventConfigs; }
public void setEventConfigs(Map<String, EventConfig> eventConfigs) { 
this.eventConfigs = eventConfigs; 
}
public Properties getKafkaProperties() { return kafkaProperties; }
public void setKafkaProperties(Properties kafkaProperties) { 
this.kafkaProperties = kafkaProperties; 
}
}
public static class EventConfig {
private Duration period;
private Duration threshold;
private Boolean stackTrace;
// Getters and setters
public Duration getPeriod() { return period; }
public void setPeriod(Duration period) { this.period = period; }
public Duration getThreshold() { return threshold; }
public void setThreshold(Duration threshold) { this.threshold = threshold; }
public Boolean getStackTrace() { return stackTrace; }
public void setStackTrace(Boolean stackTrace) { this.stackTrace = stackTrace; }
}
// Event message wrapper
public static class JfrEventMessage {
private final String eventType;
private final long startTime;
private final long duration;
private final Map<String, Object> fields;
public JfrEventMessage(RecordedEvent event) {
this.eventType = event.getEventType().getName();
this.startTime = event.getStartTime().toEpochMilli();
this.duration = event.getDuration().toNanos();
this.fields = extractFields(event);
}
private Map<String, Object> extractFields(RecordedEvent event) {
Map<String, Object> fieldMap = new LinkedHashMap<>();
event.getFields().forEach(field -> {
try {
Object value = event.getValue(field.getName());
fieldMap.put(field.getName(), convertValue(value));
} catch (Exception e) {
fieldMap.put(field.getName(), "ERROR: " + e.getMessage());
}
});
return fieldMap;
}
private Object convertValue(Object value) {
if (value == null) return null;
if (value instanceof RecordedObject) {
return value.toString(); // Simplified handling
}
return value;
}
// Getters for JSON serialization
public String getEventType() { return eventType; }
public long getStartTime() { return startTime; }
public long getDuration() { return duration; }
public Map<String, Object> getFields() { return fields; }
}
// Kafka callback
private class JfrEventCallback implements Callback {
private final String eventType;
public JfrEventCallback(String eventType) {
this.eventType = eventType;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("Failed to send " + eventType + " to Kafka: " + exception.getMessage());
}
}
}
}

2. Event Filtering and Transformation

public class FilteredJfrKafkaStreamer {
private final ConfigurableJfrKafkaStreamer delegate;
private final List<EventFilter> filters;
private final List<EventTransformer> transformers;
public FilteredJfrKafkaStreamer(JfrStreamerConfig config, 
List<EventFilter> filters, 
List<EventTransformer> transformers) {
this.delegate = new ConfigurableJfrKafkaStreamer(config);
this.filters = filters != null ? filters : new ArrayList<>();
this.transformers = transformers != null ? transformers : new ArrayList<>();
}
public void start() {
delegate.start();
}
public void stop() {
delegate.stop();
}
// Filter interface
public interface EventFilter {
boolean shouldInclude(RecordedEvent event);
}
// Transformer interface
public interface EventTransformer {
Map<String, Object> transform(RecordedEvent event);
}
// Built-in filters
public static class DurationFilter implements EventFilter {
private final long minDurationNanos;
public DurationFilter(long minDurationNanos) {
this.minDurationNanos = minDurationNanos;
}
@Override
public boolean shouldInclude(RecordedEvent event) {
return event.getDuration().toNanos() >= minDurationNanos;
}
}
public static class EventTypeFilter implements EventFilter {
private final Set<String> includedTypes;
public EventTypeFilter(Set<String> includedTypes) {
this.includedTypes = includedTypes;
}
@Override
public boolean shouldInclude(RecordedEvent event) {
return includedTypes.contains(event.getEventType().getName());
}
}
// Built-in transformers
public static class SimplifiedGcTransformer implements EventTransformer {
@Override
public Map<String, Object> transform(RecordedEvent event) {
Map<String, Object> simplified = new HashMap<>();
simplified.put("eventType", event.getEventType().getName());
simplified.put("timestamp", event.getStartTime().toEpochMilli());
if ("jdk.GarbageCollection".equals(event.getEventType().getName())) {
simplified.put("gcId", event.getValue("gcId"));
simplified.put("longestPause", event.getDuration().toMillis());
simplified.put("name", event.getValue("name"));
}
return simplified;
}
}
public static class CpuLoadTransformer implements EventTransformer {
@Override
public Map<String, Object> transform(RecordedEvent event) {
Map<String, Object> transformed = new HashMap<>();
transformed.put("eventType", event.getEventType().getName());
transformed.put("timestamp", event.getStartTime().toEpochMilli());
if ("jdk.CPULoad".equals(event.getEventType().getName())) {
transformed.put("jvmUser", event.getValue("jvmUser"));
transformed.put("jvmSystem", event.getValue("jvmSystem"));
transformed.put("machineTotal", event.getValue("machineTotal"));
}
return transformed;
}
}
}

Practical Examples

Example 1: Production-Ready JFR Monitor

public class ProductionJfrMonitor {
private final ConfigurableJfrKafkaStreamer streamer;
private final MetricsCollector metrics;
private final AlertManager alertManager;
public ProductionJfrMonitor(String bootstrapServers, String topic) {
JfrStreamerConfig config = createConfig(bootstrapServers, topic);
this.streamer = new ConfigurableJfrKafkaStreamer(config);
this.metrics = new MetricsCollector();
this.alertManager = new AlertManager();
}
private JfrStreamerConfig createConfig(String bootstrapServers, String topic) {
JfrStreamerConfig config = new JfrStreamerConfig();
config.setTopic(topic);
// Kafka configuration
Properties kafkaProps = new Properties();
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put(ProducerConfig.ACKS_CONFIG, "all");
kafkaProps.put(ProducerConfig.RETRIES_CONFIG, "3");
kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, "10");
config.setKafkaProperties(kafkaProps);
// JFR event configuration
Map<String, EventConfig> eventConfigs = new HashMap<>();
// CPU monitoring
EventConfig cpuConfig = new EventConfig();
cpuConfig.setPeriod(Duration.ofSeconds(5));
eventConfigs.put("jdk.CPULoad", cpuConfig);
// Memory monitoring
EventConfig memoryConfig = new EventConfig();
memoryConfig.setPeriod(Duration.ofSeconds(10));
eventConfigs.put("jdk.GCHeapSummary", memoryConfig);
// GC monitoring
EventConfig gcConfig = new EventConfig();
gcConfig.setPeriod(Duration.ofSeconds(30));
eventConfigs.put("jdk.GarbageCollection", gcConfig);
// Thread monitoring
EventConfig threadConfig = new EventConfig();
threadConfig.setThreshold(Duration.ofMillis(100));
eventConfigs.put("jdk.JavaMonitorEnter", threadConfig);
config.setEventConfigs(eventConfigs);
return config;
}
public void startMonitoring() {
System.out.println("Starting JFR to Kafka monitoring...");
streamer.start();
metrics.start();
}
public void stopMonitoring() {
System.out.println("Stopping JFR to Kafka monitoring...");
streamer.stop();
metrics.stop();
}
// Inner classes for supporting functionality
private static class MetricsCollector {
private final Map<String, Long> eventCounts = new ConcurrentHashMap<>();
private ScheduledExecutorService scheduler;
public void start() {
scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(this::reportMetrics, 1, 1, TimeUnit.MINUTES);
}
public void stop() {
if (scheduler != null) {
scheduler.shutdown();
}
}
private void reportMetrics() {
System.out.println("JFR Event Metrics:");
eventCounts.forEach((event, count) -> {
System.out.printf("  %s: %d events/min%n", event, count);
});
eventCounts.clear();
}
public void recordEvent(String eventType) {
eventCounts.merge(eventType, 1L, Long::sum);
}
}
private static class AlertManager {
private static final double CPU_THRESHOLD = 0.9; // 90%
private static final long GC_THRESHOLD_MS = 1000; // 1 second
public void checkForAlerts(RecordedEvent event) {
String eventType = event.getEventType().getName();
switch (eventType) {
case "jdk.CPULoad":
checkCpuAlert(event);
break;
case "jdk.GarbageCollection":
checkGcAlert(event);
break;
case "jdk.JavaMonitorEnter":
checkThreadAlert(event);
break;
}
}
private void checkCpuAlert(RecordedEvent event) {
Double machineTotal = event.getValue("machineTotal");
if (machineTotal != null && machineTotal > CPU_THRESHOLD) {
System.err.printf("HIGH CPU ALERT: %.2f%% utilization%n", machineTotal * 100);
}
}
private void checkGcAlert(RecordedEvent event) {
long durationMs = event.getDuration().toMillis();
if (durationMs > GC_THRESHOLD_MS) {
System.err.printf("LONG GC ALERT: %d ms pause%n", durationMs);
}
}
private void checkThreadAlert(RecordedEvent event) {
Long duration = event.getValue("duration");
if (duration != null && duration > 1000) { // 1 second in milliseconds
String monitorClass = event.getValue("monitorClass");
System.err.printf("LONG MONITOR WAIT: %d ms for %s%n", duration, monitorClass);
}
}
}
}

Example 2: Custom JFR Events to Kafka

// Custom JFR event
import jdk.jfr.*;
@Name("com.example.ApplicationEvent")
@Label("Application Custom Event")
@Category("Application")
@Description("Custom application-level events")
public class ApplicationEvent extends Event {
@Label("Event Type")
@Description("Type of application event")
private String type;
@Label("User ID")
@Description("User associated with the event")
private String userId;
@Label("Duration")
@Description("Event duration in milliseconds")
private long duration;
@Label("Success")
@Description("Whether the operation was successful")
private boolean success;
// Getters and setters
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public long getDuration() { return duration; }
public void setDuration(long duration) { this.duration = duration; }
public boolean isSuccess() { return success; }
public void setSuccess(boolean success) { this.success = success; }
}
// Custom event producer and streamer
public class CustomJfrEventStreamer {
private final ConfigurableJfrKafkaStreamer streamer;
private final ObjectMapper objectMapper;
public CustomJfrEventStreamer(String bootstrapServers, String topic) {
JfrStreamerConfig config = createConfig(bootstrapServers, topic);
this.streamer = new ConfigurableJfrKafkaStreamer(config);
this.objectMapper = new ObjectMapper();
}
private JfrStreamerConfig createConfig(String bootstrapServers, String topic) {
JfrStreamerConfig config = new JfrStreamerConfig();
config.setTopic(topic);
Properties kafkaProps = new Properties();
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
config.setKafkaProperties(kafkaProps);
// Enable custom events
Map<String, EventConfig> eventConfigs = new HashMap<>();
EventConfig customEventConfig = new EventConfig();
customEventConfig.setStackTrace(true);
eventConfigs.put("com.example.ApplicationEvent", customEventConfig);
config.setEventConfigs(eventConfigs);
return config;
}
public void start() {
streamer.start();
}
public void stop() {
streamer.stop();
}
// Method to record custom events
public void recordApplicationEvent(String type, String userId, long duration, boolean success) {
ApplicationEvent event = new ApplicationEvent();
event.setType(type);
event.setUserId(userId);
event.setDuration(duration);
event.setSuccess(success);
event.commit();
}
// Custom event processor
public static class CustomEventProcessor {
private final KafkaProducer<String, String> producer;
private final String topic;
public CustomEventProcessor(String bootstrapServers, String topic) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
this.producer = new KafkaProducer<>(props);
this.topic = topic;
}
public void processCustomEvent(RecordedEvent event) {
try {
Map<String, Object> eventData = new HashMap<>();
eventData.put("eventType", event.getEventType().getName());
eventData.put("timestamp", event.getStartTime().toEpochMilli());
eventData.put("type", event.getValue("type"));
eventData.put("userId", event.getValue("userId"));
eventData.put("duration", event.getValue("duration"));
eventData.put("success", event.getValue("success"));
String message = new ObjectMapper().writeValueAsString(eventData);
ProducerRecord<String, String> record = 
new ProducerRecord<>(topic, event.getString("type"), message);
producer.send(record);
} catch (Exception e) {
System.err.println("Failed to process custom event: " + e.getMessage());
}
}
}
}

Example 3: Kubernetes Deployment with JFR Streaming

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: jfr-kafka-streamer
spec:
replicas: 2
selector:
matchLabels:
app: jfr-kafka-streamer
template:
metadata:
labels:
app: jfr-kafka-streamer
spec:
containers:
- name: jfr-streamer
image: mycompany/jfr-kafka-streamer:latest
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "kafka-cluster:9092"
- name: KAFKA_TOPIC
value: "jfr-events"
- name: JVM_OPTS
value: "-XX:+FlightRecorder -XX:StartFlightRecording=disk=true,maxsize=1G,maxage=24h"
resources:
requests:
memory: "512Mi"
cpu: "200m"
limits:
memory: "1Gi"
cpu: "500m"
// Kubernetes-aware streamer
public class KubernetesJfrStreamer {
private final String podName;
private final String namespace;
private final ConfigurableJfrKafkaStreamer streamer;
public KubernetesJfrStreamer(String bootstrapServers, String topic) {
this.podName = System.getenv().getOrDefault("HOSTNAME", "unknown");
this.namespace = System.getenv().getOrDefault("NAMESPACE", "default");
JfrStreamerConfig config = createKubernetesConfig(bootstrapServers, topic);
this.streamer = new ConfigurableJfrKafkaStreamer(config);
}
private JfrStreamerConfig createKubernetesConfig(String bootstrapServers, String topic) {
JfrStreamerConfig config = new JfrStreamerConfig();
config.setTopic(topic);
Properties kafkaProps = new Properties();
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put(ProducerConfig.CLIENT_ID_CONFIG, podName);
config.setKafkaProperties(kafkaProps);
// Enhanced event configuration for Kubernetes
Map<String, EventConfig> eventConfigs = new HashMap<>();
// Container resource monitoring
EventConfig containerConfig = new EventConfig();
containerConfig.setPeriod(Duration.ofSeconds(10));
eventConfigs.put("jdk.CPULoad", containerConfig);
eventConfigs.put("jdk.GCHeapSummary", containerConfig);
// Application performance monitoring
EventConfig perfConfig = new EventConfig();
perfConfig.setThreshold(Duration.ofMillis(100));
eventConfigs.put("jdk.JavaMonitorEnter", perfConfig);
eventConfigs.put("jdk.ThreadSleep", perfConfig);
config.setEventConfigs(eventConfigs);
return config;
}
public void start() {
System.out.printf("Starting JFR streaming from pod %s in namespace %s%n", 
podName, namespace);
streamer.start();
// Add shutdown hook for graceful shutdown
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
}
public void stop() {
System.out.println("Shutting down JFR streaming...");
streamer.stop();
}
// Enhanced event processing with Kubernetes metadata
public static class KubernetesEventEnhancer implements FilteredJfrKafkaStreamer.EventTransformer {
private final String podName;
private final String namespace;
public KubernetesEventEnhancer(String podName, String namespace) {
this.podName = podName;
this.namespace = namespace;
}
@Override
public Map<String, Object> transform(RecordedEvent event) {
Map<String, Object> enhanced = new HashMap<>();
enhanced.put("eventType", event.getEventType().getName());
enhanced.put("timestamp", event.getStartTime().toEpochMilli());
enhanced.put("duration", event.getDuration().toNanos());
enhanced.put("kubernetes_pod", podName);
enhanced.put("kubernetes_namespace", namespace);
enhanced.put("jvm_id", ManagementFactory.getRuntimeMXBean().getName());
// Add event-specific fields
event.getFields().forEach(field -> {
try {
Object value = event.getValue(field.getName());
if (value != null) {
enhanced.put(field.getName(), value.toString());
}
} catch (Exception e) {
// Skip problematic fields
}
});
return enhanced;
}
}
}

Best Practices

1. Configuration Management

public class JfrStreamerFactory {
public static ConfigurableJfrKafkaStreamer createFromEnvironment() {
String bootstrapServers = getEnv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092");
String topic = getEnv("KAFKA_TOPIC", "jfr-events");
String configFile = getEnv("JFR_CONFIG", "jfr-config.json");
JfrStreamerConfig config = loadConfig(configFile);
config.getKafkaProperties().putIfAbsent(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.setTopic(topic);
return new ConfigurableJfrKafkaStreamer(config);
}
private static JfrStreamerConfig loadConfig(String configFile) {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(new File(configFile), JfrStreamerConfig.class);
} catch (Exception e) {
System.err.println("Failed to load config, using defaults: " + e.getMessage());
return createDefaultConfig();
}
}
private static JfrStreamerConfig createDefaultConfig() {
JfrStreamerConfig config = new JfrStreamerConfig();
// Default event configuration
Map<String, EventConfig> events = new HashMap<>();
events.put("jdk.CPULoad", createEventConfig(Duration.ofSeconds(5), null));
events.put("jdk.GarbageCollection", createEventConfig(Duration.ofSeconds(30), null));
events.put("jdk.JavaMonitorEnter", createEventConfig(null, Duration.ofMillis(100)));
config.setEventConfigs(events);
// Default Kafka properties
Properties kafkaProps = new Properties();
kafkaProps.put(ProducerConfig.ACKS_CONFIG, "1");
kafkaProps.put(ProducerConfig.RETRIES_CONFIG, "3");
kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
config.setKafkaProperties(kafkaProps);
return config;
}
private static EventConfig createEventConfig(Duration period, Duration threshold) {
EventConfig config = new EventConfig();
config.setPeriod(period);
config.setThreshold(threshold);
return config;
}
private static String getEnv(String key, String defaultValue) {
String value = System.getenv(key);
return value != null ? value : defaultValue;
}
}

2. Monitoring and Health Checks

public class JfrStreamerHealthCheck {
private final ConfigurableJfrKafkaStreamer streamer;
private final KafkaProducer<String, String> producer;
private long lastEventTime = System.currentTimeMillis();
private final long healthCheckInterval = 60000; // 1 minute
public JfrStreamerHealthCheck(ConfigurableJfrKafkaStreamer streamer, 
KafkaProducer<String, String> producer) {
this.streamer = streamer;
this.producer = producer;
}
public boolean isHealthy() {
long currentTime = System.currentTimeMillis();
// Check if we're receiving events regularly
if (currentTime - lastEventTime > healthCheckInterval * 2) {
return false; // No events for too long
}
// Additional health checks can be added here
return true;
}
public void recordEvent() {
lastEventTime = System.currentTimeMillis();
}
public Map<String, Object> getHealthStatus() {
Map<String, Object> status = new HashMap<>();
status.put("healthy", isHealthy());
status.put("lastEventTime", lastEventTime);
status.put("timeSinceLastEvent", System.currentTimeMillis() - lastEventTime);
return status;
}
}

This comprehensive JFR to Kafka streaming implementation provides production-ready monitoring capabilities with proper error handling, configuration management, and Kubernetes support.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper