Nuclio High-Performance Functions in Java: Building Serverless Functions for Real-Time Processing

Nuclio is a high-performance serverless framework designed for real-time data processing. This guide covers building and optimizing Java functions for Nuclio to achieve exceptional performance.


Core Concepts

What is Nuclio?

  • High-performance serverless/functions platform
  • Real-time data processing capabilities
  • Supports multiple runtimes including Java
  • Optimized for low latency and high throughput

Key Features:

  • Cold Start Optimization: Sub-100ms startup times
  • Data Bindings: Real-time connections to data sources
  • High Throughput: Millions of events per second
  • Flexible Triggers: HTTP, Cron, Message queues, Streams

Dependencies and Setup

1. Maven Dependencies
<properties>
<nuclio-sdk.version>1.7.0</nuclio-sdk.version>
<jackson.version>2.15.2</jackson.version>
<micrometer.version>1.11.0</micrometer.version>
</properties>
<dependencies>
<!-- Nuclio Java SDK -->
<dependency>
<groupId>io.nuclio</groupId>
<artifactId>nuclio-sdk-java</artifactId>
<version>${nuclio-sdk.version}</version>
</dependency>
<!-- JSON Processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- Metrics -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>${micrometer.version}</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.7</version>
</dependency>
</dependencies>
2. Project Structure
nuclio-java-functions/
├── src/
│   └── main/
│       └── java/
│           └── com/
│               └── example/
│                   └── nuclio/
│                       ├── functions/
│                       ├── handlers/
│                       ├── models/
│                       └── utils/
├── function.yaml
├── Dockerfile
└── build.gradle

Core Implementation

1. Basic Nuclio Function
// src/main/java/com/example/nuclio/handlers/SimpleFunction.java
package com.example.nuclio.handlers;
import io.nuclio.Context;
import io.nuclio.Event;
import io.nuclio.EventHandler;
import io.nuclio.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleFunction implements EventHandler {
private static final Logger logger = LoggerFactory.getLogger(SimpleFunction.class);
@Override
public Response handleEvent(Context context, Event event) {
try {
logger.info("Received event: {}", event.getPath());
String requestBody = event.getBodyString();
logger.debug("Request body: {}", requestBody);
// Process the event
String result = processRequest(requestBody);
// Return successful response
return Response.builder()
.body(result)
.contentType("application/json")
.statusCode(200)
.build();
} catch (Exception e) {
logger.error("Error processing event", e);
return Response.builder()
.body("{\"error\": \"Processing failed\"}")
.contentType("application/json")
.statusCode(500)
.build();
}
}
private String processRequest(String requestBody) {
// Simple echo function with timestamp
return String.format("""
{
"status": "success",
"message": "Request processed",
"timestamp": %d,
"echo": %s
}
""", System.currentTimeMillis(), requestBody != null ? requestBody : "null");
}
}
2. High-Performance JSON Processing Function
// src/main/java/com/example/nuclio/handlers/JsonProcessor.java
package com.example.nuclio.handlers;
import com.example.nuclio.models.ProcessRequest;
import com.example.nuclio.models.ProcessResponse;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.nuclio.Context;
import io.nuclio.Event;
import io.nuclio.EventHandler;
import io.nuclio.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicLong;
public class JsonProcessor implements EventHandler {
private static final Logger logger = LoggerFactory.getLogger(JsonProcessor.class);
private static final AtomicLong requestCounter = new AtomicLong(0);
private final ObjectMapper objectMapper;
public JsonProcessor() {
this.objectMapper = new ObjectMapper();
// Configure ObjectMapper for performance
this.objectMapper.findAndRegisterModules();
}
@Override
public Response handleEvent(Context context, Event event) {
long requestId = requestCounter.incrementAndGet();
long startTime = System.nanoTime();
try {
logger.debug("[Request-{}] Processing JSON request", requestId);
// Parse request
ProcessRequest request = objectMapper.readValue(
event.getBody(), ProcessRequest.class);
// Process data
ProcessResponse response = processData(request, requestId);
// Convert to JSON
String responseBody = objectMapper.writeValueAsString(response);
long duration = (System.nanoTime() - startTime) / 1_000_000; // ms
logger.info("[Request-{}] Processed in {}ms", requestId, duration);
return Response.builder()
.body(responseBody)
.contentType("application/json")
.header("X-Request-ID", String.valueOf(requestId))
.header("X-Processing-Time", duration + "ms")
.statusCode(200)
.build();
} catch (Exception e) {
logger.error("[Request-{}] Processing failed", requestId, e);
return Response.builder()
.body("{\"error\": \"Invalid request format\"}")
.contentType("application/json")
.statusCode(400)
.build();
}
}
private ProcessResponse processData(ProcessRequest request, long requestId) {
// Simulate various processing operations
ProcessResponse response = new ProcessResponse();
response.setRequestId(requestId);
response.setStatus("processed");
response.setTimestamp(System.currentTimeMillis());
// Transform data
if (request.getData() != null) {
String transformedData = request.getData().toUpperCase();
response.setTransformedData(transformedData);
// Calculate some metrics
response.setDataLength(transformedData.length());
response.setWordCount(transformedData.split("\\s+").length);
}
// Add processing metadata
response.setProcessingNode(System.getenv().getOrDefault("HOSTNAME", "unknown"));
return response;
}
}
// Request/Response models
package com.example.nuclio.models;
import com.fasterxml.jackson.annotation.JsonProperty;
public class ProcessRequest {
private String data;
private String operation;
@JsonProperty
public String getData() { return data; }
public void setData(String data) { this.data = data; }
@JsonProperty
public String getOperation() { return operation; }
public void setOperation(String operation) { this.operation = operation; }
}
package com.example.nuclio.models;
import com.fasterxml.jackson.annotation.JsonProperty;
public class ProcessResponse {
private long requestId;
private String status;
private long timestamp;
private String transformedData;
private int dataLength;
private int wordCount;
private String processingNode;
// Getters and setters
@JsonProperty public long getRequestId() { return requestId; }
public void setRequestId(long requestId) { this.requestId = requestId; }
@JsonProperty public String getStatus() { return status; }
public void setStatus(String status) { this.status = status; }
@JsonProperty public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
@JsonProperty public String getTransformedData() { return transformedData; }
public void setTransformedData(String transformedData) { this.transformedData = transformedData; }
@JsonProperty public int getDataLength() { return dataLength; }
public void setDataLength(int dataLength) { this.dataLength = dataLength; }
@JsonProperty public int getWordCount() { return wordCount; }
public void setWordCount(int wordCount) { this.wordCount = wordCount; }
@JsonProperty public String getProcessingNode() { return processingNode; }
public void setProcessingNode(String processingNode) { this.processingNode = processingNode; }
}
3. High-Throughput Stream Processor
// src/main/java/com/example/nuclio/handlers/StreamProcessor.java
package com.example.nuclio.handlers;
import io.nuclio.Context;
import io.nuclio.Event;
import io.nuclio.EventHandler;
import io.nuclio.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
public class StreamProcessor implements EventHandler {
private static final Logger logger = LoggerFactory.getLogger(StreamProcessor.class);
// In-memory state for stream processing
private final Map<String, AtomicLong> keyCounters;
private final AtomicLong totalEventsProcessed;
private final AtomicLong totalProcessingTime;
public StreamProcessor() {
this.keyCounters = new ConcurrentHashMap<>();
this.totalEventsProcessed = new AtomicLong(0);
this.totalProcessingTime = new AtomicLong(0);
}
@Override
public Response handleEvent(Context context, Event event) {
long startTime = System.nanoTime();
long eventId = totalEventsProcessed.incrementAndGet();
try {
// Extract key from event
String key = extractKey(event);
// Process stream event
ProcessResult result = processStreamEvent(key, event.getBodyString());
// Update counters
updateCounters(key);
long processingTime = System.nanoTime() - startTime;
totalProcessingTime.addAndGet(processingTime);
if (eventId % 1000 == 0) {
logMetrics(eventId);
}
return Response.builder()
.body(buildResponse(result, processingTime))
.contentType("application/json")
.statusCode(200)
.build();
} catch (Exception e) {
logger.error("Error processing stream event {}", eventId, e);
return Response.builder()
.body("{\"error\": \"Stream processing failed\"}")
.contentType("application/json")
.statusCode(500)
.build();
}
}
private String extractKey(Event event) {
// Extract key from headers or body
String key = event.getHeader("X-Stream-Key");
if (key != null) {
return key;
}
// Fallback to path-based key
String path = event.getPath();
if (path != null && path.contains("/")) {
return path.substring(path.lastIndexOf("/") + 1);
}
return "default";
}
private ProcessResult processStreamEvent(String key, String body) {
ProcessResult result = new ProcessResult();
result.setKey(key);
result.setProcessed(true);
result.setTimestamp(System.currentTimeMillis());
// Simulate stream processing
if (body != null) {
result.setBodyLength(body.length());
// Simple processing: count words and characters
result.setWordCount(body.split("\\s+").length);
result.setCharCount(body.length());
}
return result;
}
private void updateCounters(String key) {
keyCounters.computeIfAbsent(key, k -> new AtomicLong(0)).incrementAndGet();
}
private void logMetrics(long eventId) {
long avgProcessingTime = totalProcessingTime.get() / eventId;
logger.info("Processed {} events, average time: {} ns, unique keys: {}", 
eventId, avgProcessingTime, keyCounters.size());
}
private String buildResponse(ProcessResult result, long processingTime) {
return String.format("""
{
"key": "%s",
"processed": %b,
"timestamp": %d,
"body_length": %d,
"word_count": %d,
"char_count": %d,
"processing_time_ns": %d
}
""", result.getKey(), result.isProcessed(), result.getTimestamp(),
result.getBodyLength(), result.getWordCount(), result.getCharCount(),
processingTime);
}
// Inner class for process results
private static class ProcessResult {
private String key;
private boolean processed;
private long timestamp;
private int bodyLength;
private int wordCount;
private int charCount;
// Getters and setters
public String getKey() { return key; }
public void setKey(String key) { this.key = key; }
public boolean isProcessed() { return processed; }
public void setProcessed(boolean processed) { this.processed = processed; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
public int getBodyLength() { return bodyLength; }
public void setBodyLength(int bodyLength) { this.bodyLength = bodyLength; }
public int getWordCount() { return wordCount; }
public void setWordCount(int wordCount) { this.wordCount = wordCount; }
public int getCharCount() { return charCount; }
public void setCharCount(int charCount) { this.charCount = charCount; }
}
}
4. Real-Time Data Enrichment Function
// src/main/java/com/example/nuclio/handlers/DataEnrichmentFunction.java
package com.example.nuclio.handlers;
import com.example.nuclio.models.EnrichmentRequest;
import com.example.nuclio.models.EnrichmentResponse;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.nuclio.Context;
import io.nuclio.Event;
import io.nuclio.EventHandler;
import io.nuclio.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
public class DataEnrichmentFunction implements EventHandler {
private static final Logger logger = LoggerFactory.getLogger(DataEnrichmentFunction.class);
private final ObjectMapper objectMapper;
private final HttpClient httpClient;
private final AtomicLong requestCounter;
private final ConcurrentHashMap<String, String> cache;
// External service endpoints (could be configured via environment variables)
private final String userServiceUrl;
private final String geoServiceUrl;
private final String fraudServiceUrl;
public DataEnrichmentFunction() {
this.objectMapper = new ObjectMapper();
this.objectMapper.findAndRegisterModules();
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(2))
.build();
this.requestCounter = new AtomicLong(0);
this.cache = new ConcurrentHashMap<>();
// Get service URLs from environment
this.userServiceUrl = System.getenv().getOrDefault("USER_SERVICE_URL", 
"http://user-service:8080");
this.geoServiceUrl = System.getenv().getOrDefault("GEO_SERVICE_URL", 
"http://geo-service:8080");
this.fraudServiceUrl = System.getenv().getOrDefault("FRAUD_SERVICE_URL", 
"http://fraud-service:8080");
}
@Override
public Response handleEvent(Context context, Event event) {
long requestId = requestCounter.incrementAndGet();
long startTime = System.currentTimeMillis();
try {
logger.info("[Request-{}] Starting data enrichment", requestId);
// Parse request
EnrichmentRequest request = objectMapper.readValue(
event.getBody(), EnrichmentRequest.class);
// Parallel enrichment calls
CompletableFuture<EnrichmentResponse> enrichmentFuture = 
enrichDataAsync(request, requestId);
// Wait for completion with timeout
EnrichmentResponse response = enrichmentFuture
.orTimeout(5000, java.util.concurrent.TimeUnit.MILLISECONDS)
.join();
long processingTime = System.currentTimeMillis() - startTime;
response.setProcessingTimeMs(processingTime);
response.setRequestId(requestId);
String responseBody = objectMapper.writeValueAsString(response);
logger.info("[Request-{}] Enrichment completed in {}ms", 
requestId, processingTime);
return Response.builder()
.body(responseBody)
.contentType("application/json")
.header("X-Request-ID", String.valueOf(requestId))
.header("X-Processing-Time", processingTime + "ms")
.statusCode(200)
.build();
} catch (Exception e) {
logger.error("[Request-{}] Enrichment failed", requestId, e);
return Response.builder()
.body("{\"error\": \"Data enrichment failed\", \"request_id\": " + requestId + "}")
.contentType("application/json")
.statusCode(500)
.build();
}
}
private CompletableFuture<EnrichmentResponse> enrichDataAsync(EnrichmentRequest request, long requestId) {
EnrichmentResponse response = new EnrichmentResponse();
response.setTimestamp(System.currentTimeMillis());
// Execute enrichment steps in parallel
return CompletableFuture.supplyAsync(() -> {
try {
// 1. User data enrichment
if (request.getUserId() != null) {
enrichUserData(request.getUserId(), response);
}
// 2. Geographic enrichment
if (request.getIpAddress() != null) {
enrichGeoData(request.getIpAddress(), response);
}
// 3. Fraud check (if applicable)
if (request.getAmount() != null && request.getAmount() > 1000) {
checkFraudRisk(request, response);
}
// 4. Business logic enrichment
applyBusinessRules(request, response);
response.setSuccess(true);
logger.debug("[Request-{}] Async enrichment completed", requestId);
} catch (Exception e) {
logger.warn("[Request-{}] Partial enrichment failure", requestId, e);
response.setSuccess(false);
response.setErrorMessage("Partial enrichment completed with errors");
}
return response;
});
}
private void enrichUserData(String userId, EnrichmentResponse response) {
try {
String cacheKey = "user:" + userId;
String cachedData = cache.get(cacheKey);
if (cachedData != null) {
// Use cached data
response.setUserData(cachedData);
response.setUserDataFromCache(true);
return;
}
// Call user service
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(userServiceUrl + "/users/" + userId))
.timeout(Duration.ofSeconds(1))
.header("Accept", "application/json")
.build();
HttpResponse<String> httpResponse = httpClient.send(
request, HttpResponse.BodyHandlers.ofString());
if (httpResponse.statusCode() == 200) {
response.setUserData(httpResponse.body());
// Cache for 5 minutes
cache.put(cacheKey, httpResponse.body());
}
} catch (Exception e) {
logger.warn("User data enrichment failed for user: {}", userId, e);
response.setUserData("enrichment_failed");
}
}
private void enrichGeoData(String ipAddress, EnrichmentResponse response) {
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(geoServiceUrl + "/geo/" + ipAddress))
.timeout(Duration.ofSeconds(1))
.build();
HttpResponse<String> httpResponse = httpClient.send(
request, HttpResponse.BodyHandlers.ofString());
if (httpResponse.statusCode() == 200) {
response.setGeoData(httpResponse.body());
}
} catch (Exception e) {
logger.warn("Geo data enrichment failed for IP: {}", ipAddress, e);
}
}
private void checkFraudRisk(EnrichmentRequest request, EnrichmentResponse response) {
try {
String fraudCheckRequest = objectMapper.writeValueAsString(request);
HttpRequest httpRequest = HttpRequest.newBuilder()
.uri(URI.create(fraudServiceUrl + "/check"))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(fraudCheckRequest))
.timeout(Duration.ofSeconds(2))
.build();
HttpResponse<String> httpResponse = httpClient.send(
httpRequest, HttpResponse.BodyHandlers.ofString());
if (httpResponse.statusCode() == 200) {
response.setFraudCheckResult(httpResponse.body());
}
} catch (Exception e) {
logger.warn("Fraud check failed", e);
}
}
private void applyBusinessRules(EnrichmentRequest request, EnrichmentResponse response) {
// Apply business-specific enrichment rules
if (request.getAmount() != null) {
if (request.getAmount() > 5000) {
response.setRiskLevel("HIGH");
} else if (request.getAmount() > 1000) {
response.setRiskLevel("MEDIUM");
} else {
response.setRiskLevel("LOW");
}
}
// Add business context
response.setBusinessUnit("e-commerce");
response.setEnrichmentVersion("1.0");
}
}
// Enrichment models
package com.example.nuclio.models;
import com.fasterxml.jackson.annotation.JsonProperty;
public class EnrichmentRequest {
private String userId;
private String ipAddress;
private Double amount;
private String transactionType;
@JsonProperty public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
@JsonProperty public String getIpAddress() { return ipAddress; }
public void setIpAddress(String ipAddress) { this.ipAddress = ipAddress; }
@JsonProperty public Double getAmount() { return amount; }
public void setAmount(Double amount) { this.amount = amount; }
@JsonProperty public String getTransactionType() { return transactionType; }
public void setTransactionType(String transactionType) { this.transactionType = transactionType; }
}
package com.example.nuclio.models;
import com.fasterxml.jackson.annotation.JsonProperty;
public class EnrichmentResponse {
private long requestId;
private long timestamp;
private boolean success;
private String errorMessage;
private String userData;
private boolean userDataFromCache;
private String geoData;
private String fraudCheckResult;
private String riskLevel;
private String businessUnit;
private String enrichmentVersion;
private long processingTimeMs;
// Getters and setters
@JsonProperty public long getRequestId() { return requestId; }
public void setRequestId(long requestId) { this.requestId = requestId; }
@JsonProperty public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
@JsonProperty public boolean isSuccess() { return success; }
public void setSuccess(boolean success) { this.success = success; }
@JsonProperty public String getErrorMessage() { return errorMessage; }
public void setErrorMessage(String errorMessage) { this.errorMessage = errorMessage; }
@JsonProperty public String getUserData() { return userData; }
public void setUserData(String userData) { this.userData = userData; }
@JsonProperty public boolean isUserDataFromCache() { return userDataFromCache; }
public void setUserDataFromCache(boolean userDataFromCache) { this.userDataFromCache = userDataFromCache; }
@JsonProperty public String getGeoData() { return geoData; }
public void setGeoData(String geoData) { this.geoData = geoData; }
@JsonProperty public String getFraudCheckResult() { return fraudCheckResult; }
public void setFraudCheckResult(String fraudCheckResult) { this.fraudCheckResult = fraudCheckResult; }
@JsonProperty public String getRiskLevel() { return riskLevel; }
public void setRiskLevel(String riskLevel) { this.riskLevel = riskLevel; }
@JsonProperty public String getBusinessUnit() { return businessUnit; }
public void setBusinessUnit(String businessUnit) { this.businessUnit = businessUnit; }
@JsonProperty public String getEnrichmentVersion() { return enrichmentVersion; }
public void setEnrichmentVersion(String enrichmentVersion) { this.enrichmentVersion = enrichmentVersion; }
@JsonProperty public long getProcessingTimeMs() { return processingTimeMs; }
public void setProcessingTimeMs(long processingTimeMs) { this.processingTimeMs = processingTimeMs; }
}
5. Performance-Optimized Batch Processor
// src/main/java/com/example/nuclio/handlers/BatchProcessor.java
package com.example.nuclio.handlers;
import com.example.nuclio.models.BatchRequest;
import com.example.nuclio.models.BatchResponse;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.nuclio.Context;
import io.nuclio.Event;
import io.nuclio.EventHandler;
import io.nuclio.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
public class BatchProcessor implements EventHandler {
private static final Logger logger = LoggerFactory.getLogger(BatchProcessor.class);
private final ObjectMapper objectMapper;
private final ForkJoinPool processingPool;
public BatchProcessor() {
this.objectMapper = new ObjectMapper();
this.objectMapper.findAndRegisterModules();
// Custom thread pool for parallel processing
int parallelism = Integer.parseInt(
System.getenv().getOrDefault("BATCH_PARALLELISM", 
String.valueOf(Runtime.getRuntime().availableProcessors())));
this.processingPool = new ForkJoinPool(parallelism);
}
@Override
public Response handleEvent(Context context, Event event) {
long startTime = System.currentTimeMillis();
try {
// Parse batch request
BatchRequest batchRequest = objectMapper.readValue(
event.getBody(), BatchRequest.class);
logger.info("Processing batch with {} items", 
batchRequest.getItems().size());
// Process items in parallel
BatchResponse batchResponse = processBatchParallel(batchRequest);
long processingTime = System.currentTimeMillis() - startTime;
batchResponse.setProcessingTimeMs(processingTime);
batchResponse.setTotalItems(batchRequest.getItems().size());
String responseBody = objectMapper.writeValueAsString(batchResponse);
logger.info("Batch processed {} items in {}ms", 
batchRequest.getItems().size(), processingTime);
return Response.builder()
.body(responseBody)
.contentType("application/json")
.header("X-Processing-Time", processingTime + "ms")
.statusCode(200)
.build();
} catch (Exception e) {
logger.error("Batch processing failed", e);
return Response.builder()
.body("{\"error\": \"Batch processing failed\"}")
.contentType("application/json")
.statusCode(500)
.build();
}
}
private BatchResponse processBatchParallel(BatchRequest batchRequest) {
List<CompletableFuture<BatchResponse.ItemResult>> futures = 
batchRequest.getItems().stream()
.map(item -> CompletableFuture.supplyAsync(
() -> processItem(item), processingPool))
.collect(Collectors.toList());
// Wait for all completions
List<BatchResponse.ItemResult> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
BatchResponse response = new BatchResponse();
response.setResults(results);
// Calculate batch statistics
long successful = results.stream()
.filter(BatchResponse.ItemResult::isSuccess)
.count();
response.setSuccessfulItems(successful);
response.setFailedItems(results.size() - successful);
return response;
}
private BatchResponse.ItemResult processItem(BatchRequest.BatchItem item) {
BatchResponse.ItemResult result = new BatchResponse.ItemResult();
result.setItemId(item.getId());
result.setTimestamp(System.currentTimeMillis());
try {
// Simulate item processing
Thread.sleep(10); // Simulate 10ms processing time
// Transform data
String processedData = item.getData().toUpperCase();
result.setProcessedData(processedData);
result.setSuccess(true);
} catch (Exception e) {
result.setSuccess(false);
result.setErrorMessage("Item processing failed");
}
return result;
}
}
// Batch models
package com.example.nuclio.models;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
public class BatchRequest {
private List<BatchItem> items;
@JsonProperty public List<BatchItem> getItems() { return items; }
public void setItems(List<BatchItem> items) { this.items = items; }
public static class BatchItem {
private String id;
private String data;
@JsonProperty public String getId() { return id; }
public void setId(String id) { this.id = id; }
@JsonProperty public String getData() { return data; }
public void setData(String data) { this.data = data; }
}
}
package com.example.nuclio.models;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
public class BatchResponse {
private List<ItemResult> results;
private long totalItems;
private long successfulItems;
private long failedItems;
private long processingTimeMs;
@JsonProperty public List<ItemResult> getResults() { return results; }
public void setResults(List<ItemResult> results) { this.results = results; }
@JsonProperty public long getTotalItems() { return totalItems; }
public void setTotalItems(long totalItems) { this.totalItems = totalItems; }
@JsonProperty public long getSuccessfulItems() { return successfulItems; }
public void setSuccessfulItems(long successfulItems) { this.successfulItems = successfulItems; }
@JsonProperty public long getFailedItems() { return failedItems; }
public void setFailedItems(long failedItems) { this.failedItems = failedItems; }
@JsonProperty public long getProcessingTimeMs() { return processingTimeMs; }
public void setProcessingTimeMs(long processingTimeMs) { this.processingTimeMs = processingTimeMs; }
public static class ItemResult {
private String itemId;
private boolean success;
private String errorMessage;
private String processedData;
private long timestamp;
@JsonProperty public String getItemId() { return itemId; }
public void setItemId(String itemId) { this.itemId = itemId; }
@JsonProperty public boolean isSuccess() { return success; }
public void setSuccess(boolean success) { this.success = success; }
@JsonProperty public String getErrorMessage() { return errorMessage; }
public void setErrorMessage(String errorMessage) { this.errorMessage = errorMessage; }
@JsonProperty public String getProcessedData() { return processedData; }
public void setProcessedData(String processedData) { this.processedData = processedData; }
@JsonProperty public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
}
}

Nuclio Configuration

1. Function Configuration (function.yaml)
# function.yaml
apiVersion: "nuclio.io/v1"
kind: "Function"
metadata:
name: "java-json-processor"
namespace: "nuclio"
labels:
nuclio.io/project-name: "high-performance-java"
spec:
description: "High-performance JSON processing function"
handler: "com.example.nuclio.handlers.JsonProcessor"
runtime: "java"
minReplicas: 1
maxReplicas: 10
targetCPU: 75
resources:
requests:
memory: "256Mi"
cpu: "200m"
limits:
memory: "512Mi"
cpu: "1000m"
build:
commands:
- "apk --no-cache add curl"
codeEntryType: "image"
image: "nuclio/jvm-runner:1.7.0"
triggers:
http:
maxWorkers: 32
kind: "http"
name: "http"
attributes:
maxRequestBodySize: 1048576  # 1MB
env:
- name: "JAVA_OPTS"
value: "-XX:+UseContainerSupport -XX:MaxRAMPercentage=75.0 -Xmx384m"
- name: "NUCLIO_HANDLER"
value: "com.example.nuclio.handlers.JsonProcessor"
- name: "LOG_LEVEL"
value: "INFO"
readinessTimeoutSeconds: 60
avatar: ""
2. Multi-Function Configuration
# multi-function.yaml
apiVersion: "nuclio.io/v1"
kind: "Function"
metadata:
name: "java-stream-processor"
namespace: "nuclio"
spec:
handler: "com.example.nuclio.handlers.StreamProcessor"
runtime: "java"
minReplicas: 2
maxReplicas: 20
targetCPU: 80
resources:
limits:
memory: "1Gi"
cpu: "2000m"
build:
commands:
- "apk add --no-cache bash"
codeEntryType: "image"
triggers:
http:
kind: "http"
maxWorkers: 64
kafka:
kind: "kafka"
url: "my-kafka-broker:9092"
attributes:
topics: ["stream-events"]
consumerGroup: "nuclio-stream-processor"
initialOffset: "latest"
env:
- name: "JAVA_OPTS"
value: "-XX:+UseContainerSupport -XX:MaxRAMPercentage=80.0 -Xmx800m"
- name: "NUCLIO_HANDLER"
value: "com.example.nuclio.handlers.StreamProcessor"
---
apiVersion: "nuclio.io/v1"
kind: "Function"
metadata:
name: "java-data-enrichment"
namespace: "nuclio"
spec:
handler: "com.example.nuclio.handlers.DataEnrichmentFunction"
runtime: "java"
minReplicas: 3
maxReplicas: 15
resources:
limits:
memory: "512Mi"
cpu: "1000m"
env:
- name: "USER_SERVICE_URL"
value: "http://user-service:8080"
- name: "GEO_SERVICE_URL"
value: "http://geo-service:8080"
- name: "FRAUD_SERVICE_URL"
value: "http://fraud-service:8080"
- name: "JAVA_OPTS"
value: "-XX:+UseContainerSupport -XX:MaxRAMPercentage=75.0 -Xmx384m"
3. Dockerfile for Custom Build
# Dockerfile
FROM nuclio/jvm-runner:1.7.0
# Install additional dependencies if needed
RUN apk add --no-cache curl jq
# Copy the JAR file
COPY target/nuclio-java-functions-1.0.0.jar /home/nuclio/bin/
# Set environment variables
ENV NUCLIO_HANDLER="com.example.nuclio.handlers.JsonProcessor"
ENV JAVA_OPTS="-XX:+UseContainerSupport -XX:MaxRAMPercentage=75.0 -Xmx384m"
# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8080/ready || exit 1

Deployment and Management

1. Nuclio CLI Commands
# Deploy function
nuclio deploy -p /path/to/function.yaml
# Deploy with nuclio CLI
nuclio deploy java-json-processor \
--runtime java \
--handler com.example.nuclio.handlers.JsonProcessor \
--build-local \
--source /path/to/code \
--env JAVA_OPTS="-XX:+UseContainerSupport -XX:MaxRAMPercentage=75.0"
# Get function information
nuclio get function java-json-processor
# Invoke function
nuclio invoke java-json-processor \
--method POST \
--body '{"data": "test data"}'
# Delete function
nuclio delete function java-json-processor
2. Performance Testing Script
// src/main/java/com/example/nuclio/utils/PerformanceTester.java
package com.example.nuclio.utils;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class PerformanceTester {
private final String functionUrl;
private final HttpClient httpClient;
private final ExecutorService executorService;
public PerformanceTester(String functionUrl, int threadCount) {
this.functionUrl = functionUrl;
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(5))
.executor(Executors.newFixedThreadPool(threadCount))
.build();
this.executorService = Executors.newFixedThreadPool(threadCount);
}
public PerformanceResult runTest(int totalRequests, int concurrency, String payload) {
AtomicInteger successfulRequests = new AtomicInteger(0);
AtomicInteger failedRequests = new AtomicInteger(0);
List<Long> responseTimes = new ArrayList<>();
long startTime = System.currentTimeMillis();
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < totalRequests; i++) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
long requestStart = System.currentTimeMillis();
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(functionUrl))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(payload))
.timeout(Duration.ofSeconds(10))
.build();
HttpResponse<String> response = httpClient.send(
request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
successfulRequests.incrementAndGet();
} else {
failedRequests.incrementAndGet();
}
long responseTime = System.currentTimeMillis() - requestStart;
responseTimes.add(responseTime);
} catch (Exception e) {
failedRequests.incrementAndGet();
}
}, executorService);
futures.add(future);
// Control concurrency
if (futures.size() >= concurrency) {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
futures.clear();
}
}
// Wait for remaining requests
if (!futures.isEmpty()) {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
long totalTime = System.currentTimeMillis() - startTime;
return new PerformanceResult(
totalRequests,
successfulRequests.get(),
failedRequests.get(),
totalTime,
responseTimes
);
}
public static class PerformanceResult {
private final int totalRequests;
private final int successfulRequests;
private final int failedRequests;
private final long totalTimeMs;
private final List<Long> responseTimes;
public PerformanceResult(int totalRequests, int successfulRequests, 
int failedRequests, long totalTimeMs, 
List<Long> responseTimes) {
this.totalRequests = totalRequests;
this.successfulRequests = successfulRequests;
this.failedRequests = failedRequests;
this.totalTimeMs = totalTimeMs;
this.responseTimes = responseTimes;
}
public void printSummary() {
double requestsPerSecond = (successfulRequests * 1000.0) / totalTimeMs;
double avgResponseTime = responseTimes.stream()
.mapToLong(Long::longValue)
.average()
.orElse(0.0);
long p95 = calculatePercentile(95);
long p99 = calculatePercentile(99);
System.out.println("=== Performance Test Results ===");
System.out.printf("Total Requests: %d\n", totalRequests);
System.out.printf("Successful: %d\n", successfulRequests);
System.out.printf("Failed: %d\n", failedRequests);
System.out.printf("Success Rate: %.2f%%\n", 
(successfulRequests * 100.0) / totalRequests);
System.out.printf("Requests/sec: %.2f\n", requestsPerSecond);
System.out.printf("Average Response Time: %.2f ms\n", avgResponseTime);
System.out.printf("95th Percentile: %d ms\n", p95);
System.out.printf("99th Percentile: %d ms\n", p99);
System.out.printf("Total Time: %d ms\n", totalTimeMs);
}
private long calculatePercentile(int percentile) {
List<Long> sorted = new ArrayList<>(responseTimes);
sorted.sort(Long::compareTo);
int index = (int) Math.ceil(percentile / 100.0 * sorted.size()) - 1;
return sorted.get(Math.max(0, index));
}
}
}

Best Practices for High Performance

1. Memory Management
// src/main/java/com/example/nuclio/utils/MemoryOptimizer.java
package com.example.nuclio.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
public class MemoryOptimizer {
private static final Logger logger = LoggerFactory.getLogger(MemoryOptimizer.class);
private static final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
public static void logMemoryUsage() {
MemoryUsage heapMemoryUsage = memoryMXBean.getHeapMemoryUsage();
MemoryUsage nonHeapMemoryUsage = memoryMXBean.getNonHeapMemoryUsage();
logger.info("Heap Memory - Used: {} MB, Max: {} MB", 
heapMemoryUsage.getUsed() / 1024 / 1024,
heapMemoryUsage.getMax() / 1024 / 1024);
logger.info("Non-Heap Memory - Used: {} MB", 
nonHeapMemoryUsage.getUsed() / 1024 / 1024);
}
public static boolean isMemoryPressureHigh() {
MemoryUsage heapMemoryUsage = memoryMXBean.getHeapMemoryUsage();
double usageRatio = (double) heapMemoryUsage.getUsed() / heapMemoryUsage.getMax();
return usageRatio > 0.8; // 80% memory usage
}
public static void suggestGCOptimizations() {
Runtime runtime = Runtime.getRuntime();
long maxMemory = runtime.maxMemory();
long totalMemory = runtime.totalMemory();
long freeMemory = runtime.freeMemory();
logger.info("JVM Memory - Max: {} MB, Total: {} MB, Free: {} MB",
maxMemory / 1024 / 1024,
totalMemory / 1024 / 1024,
freeMemory / 1024 / 1024);
if (maxMemory < 512 * 1024 * 1024) { // 512MB
logger.warn("Consider increasing JVM heap size for better performance");
}
}
}
2. Connection Pooling
// src/main/java/com/example/nuclio/utils/HttpClientManager.java
package com.example.nuclio.utils;
import java.net.http.HttpClient;
import java.time.Duration;
import java.util.concurrent.Executors;
public class HttpClientManager {
private static HttpClient sharedHttpClient;
public static HttpClient getSharedClient() {
if (sharedHttpClient == null) {
sharedHttpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(5))
.executor(Executors.newFixedThreadPool(50))
.followRedirects(HttpClient.Redirect.NORMAL)
.build();
}
return sharedHttpClient;
}
}

Monitoring and Observability

1. Metrics Collection
// src/main/java/com/example/nuclio/utils/FunctionMetrics.java
package com.example.nuclio.utils;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
public class FunctionMetrics {
private final MeterRegistry meterRegistry;
private final ConcurrentHashMap<String, Counter> counters;
private final ConcurrentHashMap<String, Timer> timers;
public FunctionMetrics() {
this.meterRegistry = new SimpleMeterRegistry();
this.counters = new ConcurrentHashMap<>();
this.timers = new ConcurrentHashMap<>();
}
public void recordInvocation(String functionName, long durationMs, boolean success) {
getCounter(functionName + ".invocations.total").increment();
if (success) {
getCounter(functionName + ".invocations.success").increment();
} else {
getCounter(functionName + ".invocations.failed").increment();
}
getTimer(functionName + ".processing.time").record(durationMs, TimeUnit.MILLISECONDS);
}
public void recordError(String functionName, String errorType) {
getCounter(functionName + ".errors." + errorType).increment();
}
private Counter getCounter(String name) {
return counters.computeIfAbsent(name, key -> 
Counter.builder(key)
.description("Counter for " + key)
.register(meterRegistry)
);
}
private Timer getTimer(String name) {
return timers.computeIfAbsent(name, key -> 
Timer.builder(key)
.description("Timer for " + key)
.register(meterRegistry)
);
}
public void printMetrics() {
counters.values().forEach(counter -> 
System.out.println(counter.getId().getName() + ": " + counter.count()));
}
}

Conclusion

Nuclio high-performance Java functions provide:

  • Exceptional Performance: Sub-100ms cold starts, millions of events/sec
  • Real-time Processing: Stream and event-driven architectures
  • Resource Efficiency: Optimized memory and CPU usage
  • Scalability: Automatic scaling based on load
  • Flexible Triggers: HTTP, message queues, cron, streams

By implementing the patterns shown above, you can build high-performance serverless functions in Java that handle real-time data processing, batch operations, and data enrichment with exceptional efficiency and scalability.

Pyroscope Profiling in Java
Explains how to use Pyroscope for continuous profiling in Java applications, helping developers analyze CPU and memory usage patterns to improve performance and identify bottlenecks.
https://macronepal.com/blog/pyroscope-profiling-in-java/

OpenTelemetry Metrics in Java: Comprehensive Guide
Provides a complete guide to collecting and exporting metrics in Java using OpenTelemetry, including counters, histograms, gauges, and integration with monitoring tools. (MACRO NEPAL)
https://macronepal.com/blog/opentelemetry-metrics-in-java-comprehensive-guide/

OTLP Exporter in Java: Complete Guide for OpenTelemetry
Explains how to configure OTLP exporters in Java to send telemetry data such as traces, metrics, and logs to monitoring systems using HTTP or gRPC protocols. (MACRO NEPAL)
https://macronepal.com/blog/otlp-exporter-in-java-complete-guide-for-opentelemetry/

Thanos Integration in Java: Global View of Metrics
Explains how to integrate Thanos with Java monitoring systems to create a scalable global metrics view across multiple Prometheus instances.

https://macronepal.com/blog/thanos-integration-in-java-global-view-of-metrics

Time Series with InfluxDB in Java: Complete Guide (Version 2)
Explains how to manage time-series data using InfluxDB in Java applications, including storing, querying, and analyzing metrics data.

https://macronepal.com/blog/time-series-with-influxdb-in-java-complete-guide-2

Time Series with InfluxDB in Java: Complete Guide
Provides an overview of integrating InfluxDB with Java for time-series data handling, including monitoring applications and managing performance metrics.

https://macronepal.com/blog/time-series-with-influxdb-in-java-complete-guide

Implementing Prometheus Remote Write in Java (Version 2)
Explains how to configure Java applications to send metrics data to Prometheus-compatible systems using the remote write feature for scalable monitoring.

https://macronepal.com/blog/implementing-prometheus-remote-write-in-java-a-complete-guide-2

Implementing Prometheus Remote Write in Java: Complete Guide
Provides instructions for sending metrics from Java services to Prometheus servers, enabling centralized monitoring and real-time analytics.

https://macronepal.com/blog/implementing-prometheus-remote-write-in-java-a-complete-guide

Building a TileServer GL in Java: Vector and Raster Tile Server
Explains how to build a TileServer GL in Java for serving vector and raster map tiles, useful for geographic visualization and mapping applications.

https://macronepal.com/blog/building-a-tileserver-gl-in-java-vector-and-raster-tile-server

Indoor Mapping in Java
Explains how to create indoor mapping systems in Java, including navigation inside buildings, spatial data handling, and visualization techniques.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper