Streamlining Observability Data: JFR to Parquet for Big Data Analytics

Java Flight Recorder (JFR) produces rich telemetry data, but analyzing it at scale requires efficient storage and processing. Apache Parquet's columnar format provides the perfect solution for storing JFR events in big data environments. This guide explores converting JFR data to Parquet for scalable analytics and machine learning.


The Challenge: Scaling JFR Analysis

JFR generates extensive diagnostic data during runtime:

  • High-volume event streams (1000+ events/second)
  • Complex nested structures with varying schemas
  • Temporal relationships across event types
  • Mixed data types (primitives, strings, arrays)

Traditional approaches like CSV or JSON become inefficient at scale:

// Inefficient JSON representation
{
"events": [
{
"timestamp": 1672531200000,
"eventType": "jdk.CPULoad",
"values": {
"jvmUser": 0.45,
"jvmSystem": 0.15,
"machineTotal": 2.34
}
}
// ... thousands more events
]
}

Why Parquet for JFR Data?

Benefits:

  • Columnar storage - Efficient for analytical queries
  • Compression - 75-90% size reduction vs. JSON
  • Schema evolution - Handle changing JFR event schemas
  • Big data ecosystem integration - Spark, Presto, Hive
  • Predicate pushdown - Filter during read, not in memory

Architecture Overview

┌─────────────────────────────────────────────────────────────┐
│                   JFR Binary Files                          │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │  recording1 │  │  recording2 │  │  recordingN │         │
│  │    .jfr     │  │    .jfr     │  │    .jfr     │         │
│  └─────────────┘  └─────────────┘  └─────────────┘         │
└───────────────────────┬─────────────────────────────────────┘
│
┌───────────────────────▼─────────────────────────────────────┐
│                  JFR Parser & Converter                     │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │ Event       │  │ Schema      │  │ Data Type   │         │
│  │ Extraction  │  │ Inference   │  │ Mapping     │         │
│  └─────────────┘  └─────────────┘  └─────────────┘         │
└───────────────────────┬─────────────────────────────────────┘
│
┌───────────────────────▼─────────────────────────────────────┐
│                 Parquet Writer Engine                       │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │ Columnar    │  │ Compression │  │ Partition   │         │
│  │ Chunking    │  │ (SNAPPY,    │  │ Strategy    │         │
│  │             │  │   GZIP)     │  │             │         │
│  └─────────────┘  └─────────────┘  └─────────────┘         │
└───────────────────────┬─────────────────────────────────────┘
│
┌───────────────────────▼─────────────────────────────────────┐
│                Partitioned Parquet Files                    │
│  ┌───────────────────┐  ┌───────────────────┐              │
│  │ event_type=       │  │ event_type=       │              │
│  │ jdk.CPULoad/      │  │ jdk.GCHeapSummary/│              │
│  │ part-0.parquet    │  │ part-0.parquet    │              │
│  └───────────────────┘  └───────────────────┘              │
└─────────────────────────────────────────────────────────────┘

Core Implementation

1. Dependencies Setup

<!-- pom.xml -->
<dependencies>
<!-- JFR Parser -->
<dependency>
<groupId>org.openjdk.jmc</groupId>
<artifactId>flightrecorder</artifactId>
<version>8.3.1</version>
</dependency>
<!-- Parquet -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.2</version>
</dependency>
<!-- Big Data Integration -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.6</version>
</dependency>
<!-- Compression -->
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.10.3</version>
</dependency>
</dependencies>

2. JFR Event Extraction

public class JFRToParquetConverter {
private static final Logger logger = LoggerFactory.getLogger(JFRToParquetConverter.class);
public void convertJFRToParquet(Path jfrFilePath, Path outputDir) throws IOException {
try (IItemCollection events = readJFREvents(jfrFilePath)) {
// Group events by type for efficient columnar storage
Map<String, IItemCollection> eventsByType = events.groupBy(
ItemFilters.type(IAggregator.getEventTypeMap().keySet())
);
// Convert each event type to separate Parquet files
for (Map.Entry<String, IItemCollection> entry : eventsByType.entrySet()) {
String eventType = entry.getKey();
IItemCollection typeEvents = entry.getValue();
convertEventTypeToParquet(eventType, typeEvents, outputDir);
}
}
}
private IItemCollection readJFREvents(Path jfrFilePath) throws IOException {
try {
return JfrLoaderToolkit.loadEvents(jfrFilePath.toFile());
} catch (Exception e) {
throw new IOException("Failed to parse JFR file: " + jfrFilePath, e);
}
}
}

3. Schema Inference and Mapping

public class JFRSchemaInferrer {
public Schema inferSchema(String eventType, IItemCollection events) {
if (events.isEmpty()) {
return createEmptySchema(eventType);
}
// Sample first event to infer schema
IItemIterable<?> itemIterable = events.iterator().next();
IItem firstEvent = itemIterable.iterator().next();
SchemaBuilder.FieldAssembler<Schema> schemaBuilder = SchemaBuilder
.record(eventType.replace(".", "_"))
.namespace("com.jfr.parquet")
.fields();
// Extract fields from JFR event
addFieldsFromEvent(schemaBuilder, firstEvent);
return schemaBuilder.endRecord();
}
private void addFieldsFromEvent(SchemaBuilder.FieldAssembler<Schema> builder, IItem event) {
// Common metadata fields
builder
.name("timestamp").type().longType().noDefault()
.name("duration").type().longType().noDefault()
.name("eventThread").type().stringType().noDefault()
.name("eventType").type().stringType().noDefault();
// Event-specific fields
for (IMemberAccessor accessor : event.getType().getAccessors()) {
String fieldName = accessor.getIdentifier();
Class<?> fieldType = accessor.getType();
Schema fieldSchema = mapJFRTypeToAvro(fieldType, fieldName);
if (fieldSchema != null) {
builder.name(fieldName).type(fieldSchema).noDefault();
}
}
}
private Schema mapJFRTypeToAvro(Class<?> jfrType, String fieldName) {
if (jfrType == String.class) {
return Schema.create(Schema.Type.STRING);
} else if (jfrType == Long.class || jfrType == long.class) {
return Schema.create(Schema.Type.LONG);
} else if (jfrType == Integer.class || jfrType == int.class) {
return Schema.create(Schema.Type.INT);
} else if (jfrType == Double.class || jfrType == double.class) {
return Schema.create(Schema.Type.DOUBLE);
} else if (jfrType == Float.class || jfrType == float.class) {
return Schema.create(Schema.Type.FLOAT);
} else if (jfrType == Boolean.class || jfrType == boolean.class) {
return Schema.create(Schema.Type.BOOLEAN);
} else if (jfrType.isArray()) {
return Schema.createArray(mapJFRTypeToAvro(jfrType.getComponentType(), fieldName));
} else {
logger.warn("Unsupported JFR type: {} for field {}", jfrType, fieldName);
return Schema.create(Schema.Type.STRING); // Fallback to string
}
}
}

4. Parquet Writing Engine

public class ParquetWriterEngine {
private final Configuration hadoopConfig;
public ParquetWriterEngine() {
this.hadoopConfig = new Configuration();
this.hadoopConfig.set("parquet.compression", "SNAPPY");
}
public <T> void writeEventsToParquet(
String eventType, 
List<T> events, 
Schema schema, 
Path outputPath) throws IOException {
GenericData model = GenericData.get();
ParquetWriter<GenericRecord> writer = createWriter(schema, outputPath);
try {
for (T event : events) {
GenericRecord record = convertToGenericRecord(event, schema, model);
writer.write(record);
}
} finally {
writer.close();
}
logger.info("Written {} events of type {} to {}", events.size(), eventType, outputPath);
}
private ParquetWriter<GenericRecord> createWriter(Schema schema, Path outputPath) throws IOException {
return AvroParquetWriter.<GenericRecord>builder(new org.apache.hadoop.fs.Path(outputPath.toString()))
.withSchema(schema)
.withConf(hadoopConfig)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withRowGroupSize(128 * 1024 * 1024) // 128MB row groups
.withPageSize(1 * 1024 * 1024) // 1MB pages
.build();
}
private GenericRecord convertToGenericRecord(Object event, Schema schema, GenericData model) {
GenericRecord record = new GenericData.Record(schema);
// Map JFR event fields to Avro record
// Implementation depends on specific JFR event structure
populateRecordFromEvent(record, event, schema);
return record;
}
}

Advanced Event Processing

1. Efficient Batch Processing

public class BatchJFRProcessor {
private final ExecutorService executor;
private final JFRToParquetConverter converter;
public BatchJFRProcessor(int parallelism) {
this.executor = Executors.newFixedThreadPool(parallelism);
this.converter = new JFRToParquetConverter();
}
public CompletableFuture<Void> processDirectory(Path inputDir, Path outputDir) {
try {
List<Path> jfrFiles = Files.walk(inputDir)
.filter(path -> path.toString().endsWith(".jfr"))
.collect(Collectors.toList());
List<CompletableFuture<Void>> futures = jfrFiles.stream()
.map(jfrFile -> processSingleFile(jfrFile, outputDir))
.collect(Collectors.toList());
return CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
} catch (IOException e) {
return CompletableFuture.failedFuture(e);
}
}
private CompletableFuture<Void> processSingleFile(Path jfrFile, Path outputDir) {
return CompletableFuture.runAsync(() -> {
try {
String fileName = jfrFile.getFileName().toString();
Path fileOutputDir = outputDir.resolve(fileName.replace(".jfr", ""));
converter.convertJFRToParquet(jfrFile, fileOutputDir);
logger.info("Processed JFR file: {}", jfrFile);
} catch (Exception e) {
logger.error("Failed to process JFR file: {}", jfrFile, e);
throw new CompletionException(e);
}
}, executor);
}
public void shutdown() {
executor.shutdown();
}
}

2. Memory-Efficient Streaming Conversion

public class StreamingJFRConverter {
private final int batchSize;
private final Map<String, List<GenericRecord>> eventBuffers;
private final ParquetWriterEngine writerEngine;
public StreamingJFRConverter(int batchSize, Path baseOutputDir) {
this.batchSize = batchSize;
this.eventBuffers = new HashMap<>();
this.writerEngine = new ParquetWriterEngine();
this.baseOutputDir = baseOutputDir;
}
public void processEvent(IItem event) {
String eventType = event.getType().getIdentifier();
GenericRecord record = convertJFREventToRecord(event);
eventBuffers.computeIfAbsent(eventType, k -> new ArrayList<>())
.add(record);
// Flush batch when buffer reaches threshold
if (eventBuffers.get(eventType).size() >= batchSize) {
flushEventType(eventType);
}
}
private void flushEventType(String eventType) {
List<GenericRecord> buffer = eventBuffers.get(eventType);
if (buffer == null || buffer.isEmpty()) return;
try {
Schema schema = inferSchemaFromRecords(eventType, buffer);
Path outputPath = getOutputPath(eventType, schema);
writerEngine.writeEventsToParquet(eventType, buffer, schema, outputPath);
// Clear buffer after successful write
buffer.clear();
} catch (IOException e) {
logger.error("Failed to flush events for type: {}", eventType, e);
}
}
private Path getOutputPath(String eventType, Schema schema) {
String safeEventType = eventType.replace(".", "_")
.replace(":", "_");
return baseOutputDir.resolve("event_type=" + safeEventType)
.resolve(UUID.randomUUID() + ".parquet");
}
}

Big Data Integration

1. Spark Analytics Integration

// JFR Analytics with Spark
object JFRAnalytics {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("JFR Analytics")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.parquet.filterPushdown", "true")
.getOrCreate()
// Read partitioned Parquet files
val jfrDF = spark.read.parquet("/data/jfr/event_type=*/*.parquet")
// Register for SQL queries
jfrDF.createOrReplaceTempView("jfr_events")
// Perform complex analytics
val gcAnalysis = analyzeGCEvents(spark)
val cpuAnalysis = analyzeCPULoad(spark)
val memoryAnalysis = analyzeMemoryUsage(spark)
// Save results
gcAnalysis.write.parquet("/analytics/gc_metrics")
cpuAnalysis.write.parquet("/analytics/cpu_metrics")
spark.stop()
}
def analyzeGCEvents(spark: SparkSession): DataFrame = {
spark.sql("""
SELECT 
window.start as time_bucket,
eventType,
AVG(cast(values['usedHeap'] as double)) as avg_used_heap,
AVG(cast(values['usedNonHeap'] as double)) as avg_used_nonheap,
COUNT(*) as gc_count,
SUM(cast(duration as double)) as total_gc_time
FROM jfr_events
WHERE eventType = 'jdk.GCHeapSummary'
GROUP BY window(timestamp, '5 minutes'), eventType
ORDER BY time_bucket
""")
}
def analyzeCPULoad(spark: SparkSession): DataFrame = {
spark.sql("""
SELECT 
DATE_FORMAT(timestamp, 'yyyy-MM-dd HH:mm') as minute,
AVG(cast(values['jvmUser'] as double)) as avg_jvm_user,
AVG(cast(values['jvmSystem'] as double)) as avg_jvm_system,
AVG(cast(values['machineTotal'] as double)) as avg_machine_total,
PERCENTILE_APPROX(cast(values['machineTotal'] as double), 0.95) as p95_machine_total
FROM jfr_events
WHERE eventType = 'jdk.CPULoad'
GROUP BY DATE_FORMAT(timestamp, 'yyyy-MM-dd HH:mm')
ORDER BY minute
""")
}
}

2. Presto/Trino Query Examples

-- Query JFR data in Presto/Trino
SELECT 
event_type,
date_trunc('hour', from_unixtime(timestamp / 1000)) as hour_bucket,
count(*) as event_count,
avg(cast(json_extract_scalar(values, '$.usedHeap') as double)) as avg_heap_used
FROM jfr_events 
WHERE event_type = 'jdk.GCHeapSummary'
AND date_trunc('day', from_unixtime(timestamp / 1000)) = current_date
GROUP BY event_type, date_trunc('hour', from_unixtime(timestamp / 1000))
ORDER BY hour_bucket;
-- Correlation analysis between GC and CPU
SELECT 
gc.hour_bucket,
gc.avg_gc_time,
cpu.avg_cpu_load,
corr(gc.avg_gc_time, cpu.avg_cpu_load) as correlation
FROM (
SELECT 
date_trunc('hour', from_unixtime(timestamp / 1000)) as hour_bucket,
avg(cast(json_extract_scalar(values, '$.duration') as double)) as avg_gc_time
FROM jfr_events 
WHERE event_type = 'jdk.GarbageCollection'
GROUP BY date_trunc('hour', from_unixtime(timestamp / 1000))
) gc
JOIN (
SELECT 
date_trunc('hour', from_unixtime(timestamp / 1000)) as hour_bucket,
avg(cast(json_extract_scalar(values, '$.machineTotal') as double)) as avg_cpu_load
FROM jfr_events 
WHERE event_type = 'jdk.CPULoad'
GROUP BY date_trunc('hour', from_unixtime(timestamp / 1000))
) cpu ON gc.hour_bucket = cpu.hour_bucket;

Machine Learning Applications

1. Feature Engineering for Anomaly Detection

public class JFRFeatureEngine {
public Dataset<Row> extractFeatures(SparkSession spark, String parquetPath) {
Dataset<Row> events = spark.read().parquet(parquetPath);
// Feature engineering for different event types
Dataset<Row> gcFeatures = extractGCFeatures(events);
Dataset<Row> cpuFeatures = extractCPUFeatures(events);
Dataset<Row> memoryFeatures = extractMemoryFeatures(events);
// Join all features
return gcFeatures.join(cpuFeatures, "time_bucket")
.join(memoryFeatures, "time_bucket");
}
private Dataset<Row> extractGCFeatures(Dataset<Row> events) {
return events.filter("eventType = 'jdk.GarbageCollection'")
.groupBy(functions.window(col("timestamp"), "5 minutes"))
.agg(
count("*").alias("gc_count"),
avg("values.duration").alias("avg_gc_duration"),
sum("values.duration").alias("total_gc_time"),
expr("percentile_approx(values.duration, 0.95)").alias("p95_gc_duration")
)
.withColumn("time_bucket", col("window.start"));
}
private Dataset<Row> extractCPUFeatures(Dataset<Row> events) {
return events.filter("eventType = 'jdk.CPULoad'")
.groupBy(functions.window(col("timestamp"), "5 minutes"))
.agg(
avg("values.jvmUser").alias("avg_jvm_user_cpu"),
avg("values.jvmSystem").alias("avg_jvm_system_cpu"),
avg("values.machineTotal").alias("avg_total_cpu"),
stddev("values.machineTotal").alias("cpu_volatility")
)
.withColumn("time_bucket", col("window.start"));
}
public Dataset<Row> prepareTrainingData(Dataset<Row> features, Dataset<Row> labels) {
VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[]{
"gc_count", "avg_gc_duration", "total_gc_time", 
"avg_jvm_user_cpu", "avg_jvm_system_cpu", "cpu_volatility"
})
.setOutputCol("features");
return assembler.transform(features.join(labels, "time_bucket"));
}
}

2. Anomaly Detection Model

// Anomaly detection using Isolation Forest
object JFRAnomalyDetection {
def trainAnomalyDetectionModel(trainingData: Dataset[Row]): IsolationForestModel = {
val isolationForest = new IsolationForest()
.setNumEstimators(100)
.setBootstrap(false)
.setMaxSamples(256)
.setMaxFeatures(1.0)
.setFeaturesCol("features")
.setPredictionCol("prediction")
.setScoreCol("anomaly_score")
.setContamination(0.1)
.setRandomSeed(42)
isolationForest.fit(trainingData)
}
def detectAnomalies(model: IsolationForestModel, features: Dataset[Row]): Dataset[Row] = {
val predictions = model.transform(features)
predictions.filter("prediction = 1") // 1 indicates anomaly
}
}

Production Deployment

1. Kubernetes-based Processing Pipeline

# kubernetes/jfr-processor.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: jfr-parquet-processor
spec:
replicas: 3
selector:
matchLabels:
app: jfr-processor
template:
metadata:
labels:
app: jfr-processor
spec:
containers:
- name: processor
image: company/jfr-parquet:1.0.0
env:
- name: INPUT_S3_PATH
value: "s3://jfr-recordings/"
- name: OUTPUT_S3_PATH  
value: "s3://jfr-parquet/"
- name: BATCH_SIZE
value: "10000"
resources:
requests:
memory: "2Gi"
cpu: "1000m"
limits:
memory: "4Gi"
cpu: "2000m"
volumeMounts:
- name: config
mountPath: /app/config
volumes:
- name: config
configMap:
name: jfr-processor-config

2. Monitoring and Metrics

@RestController
public class ProcessingMetrics {
private final MeterRegistry meterRegistry;
private final Counter processedEvents;
private final Timer conversionTimer;
public ProcessingMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.processedEvents = meterRegistry.counter("jfr.events.processed");
this.conversionTimer = meterRegistry.timer("jfr.conversion.duration");
}
@EventListener
public void handleConversionComplete(ConversionCompleteEvent event) {
processedEvents.increment(event.getEventCount());
Map<String, String> tags = Map.of(
"eventType", event.getEventType(),
"status", event.getStatus()
);
meterRegistry.counter("jfr.conversion.complete", tags).increment();
}
public void recordConversionTime(Runnable conversionTask) {
conversionTimer.record(conversionTask);
}
}

Performance Optimization

1. Compression Benchmarking

public class CompressionBenchmark {
public void benchmarkCompression() {
Map<String, CompressionCodecName> codecs = Map.of(
"UNCOMPRESSED", CompressionCodecName.UNCOMPRESSED,
"SNAPPY", CompressionCodecName.SNAPPY,
"GZIP", CompressionCodecName.GZIP,
"LZ4", CompressionCodecName.LZ4
);
for (Map.Entry<String, CompressionCodecName> entry : codecs.entrySet()) {
long startTime = System.currentTimeMillis();
Path outputPath = Paths.get("/tmp/benchmark_" + entry.getKey() + ".parquet");
long fileSize = writeWithCompression(testEvents, entry.getValue(), outputPath);
long duration = System.currentTimeMillis() - startTime;
System.out.printf("%s: %d bytes, %d ms, %.2f MB/s%n",
entry.getKey(), fileSize, duration,
(fileSize / (1024.0 * 1024.0)) / (duration / 1000.0));
}
}
}

2. Partitioning Strategy

public class SmartPartitioning {
public Path getPartitionedPath(IItem event, Path basePath) {
String eventType = event.getType().getIdentifier();
long timestamp = event.getStartTime().getTime();
// Partition by event type and date
LocalDate eventDate = Instant.ofEpochMilli(timestamp)
.atZone(ZoneId.systemDefault())
.toLocalDate();
return basePath.resolve("event_type=" + eventType)
.resolve("date=" + eventDate.toString())
.resolve(generateFileName());
}
public void optimizePartitionLayout(SparkSession spark, Path parquetPath) {
// Use Spark to optimize small files
spark.sql("OPTIMIZE parquet.`" + parquetPath + "` ZORDER BY (timestamp, eventType)");
}
}

Conclusion

Converting JFR to Parquet format enables:

  • Scalable analytics on performance data using big data tools
  • Cost-effective storage with high compression ratios
  • Fast query performance through columnar storage and predicate pushdown
  • Machine learning readiness with structured feature extraction
  • Long-term retention of performance telemetry for trend analysis

This approach transforms JFR from a debugging tool into a strategic observability asset that can drive performance optimization, capacity planning, and proactive issue detection across the entire application lifecycle. The combination of JFR's detailed runtime insights with Parquet's analytical capabilities creates a powerful foundation for data-driven performance engineering.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper