Titliel – Cilium Hubble Integration in Java

Article: Implementing Cilium Hubble Observability with Java

Cilium Hubble is a fully distributed networking and security observability platform for cloud-native workloads. It's built on top of Cilium and eBPF to enable deep visibility into the communication and behavior of services and applications.

Why Cilium Hubble Matters

Hubble provides critical insights for:

  • Network Security Monitoring: Real-time detection of network-based threats
  • Service Dependency Mapping: Automatic discovery of service relationships
  • Performance Monitoring: Latency and throughput metrics for network calls
  • Compliance Auditing: Detailed flow logs for regulatory requirements
  • Troubleshooting: Rapid diagnosis of network and security issues

Java Implementation

Here's a comprehensive Java solution for integrating with Cilium Hubble:

Core Hubble Client Implementation

package com.titliel.hubble;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import org.hubble.api.GetFlowsRequest;
import org.hubble.api.GetFlowsResponse;
import org.hubble.api.GetNodesRequest;
import org.hubble.api.GetNodesResponse;
import org.hubble.api.GetNamespacesRequest;
import org.hubble.api.GetNamespacesResponse;
import org.hubble.api.HubbleServiceGrpc;
import org.hubble.api.Node;
import org.hubble.api.Flow;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.*;
/**
* Titliel Hubble Client - Java integration for Cilium Hubble observability
*/
public class TitlielHubbleClient implements AutoCloseable {
private final ManagedChannel channel;
private final HubbleServiceGrpc.HubbleServiceBlockingStub blockingStub;
private final HubbleServiceGrpc.HubbleServiceStub asyncStub;
private final String hubbleServer;
private final int hubblePort;
public TitlielHubbleClient(String server, int port) {
this.hubbleServer = server;
this.hubblePort = port;
this.channel = ManagedChannelBuilder.forAddress(server, port)
.usePlaintext()
.build();
this.blockingStub = HubbleServiceGrpc.newBlockingStub(channel);
this.asyncStub = HubbleServiceGrpc.newStub(channel);
}
public TitlielHubbleClient(String server) {
this(server, 4245); // Default Hubble port
}
/**
* Get real-time flow stream from Hubble
*/
public void streamFlows(FlowObserver observer, FlowFilter... filters) {
GetFlowsRequest.Builder requestBuilder = GetFlowsRequest.newBuilder();
// Apply filters if provided
for (FlowFilter filter : filters) {
requestBuilder.addBlacklist(filter.toProtoFilter());
}
GetFlowsRequest request = requestBuilder.build();
asyncStub.getFlows(request, new StreamObserver<GetFlowsResponse>() {
@Override
public void onNext(GetFlowsResponse response) {
observer.onFlow(response.getFlow());
}
@Override
public void onError(Throwable t) {
observer.onError(new HubbleException("Flow stream error", t));
}
@Override
public void onCompleted() {
observer.onCompleted();
}
});
}
/**
* Get historical flows with pagination
*/
public List<Flow> getHistoricalFlows(FlowQuery query) {
List<Flow> flows = new ArrayList<>();
GetFlowsRequest request = query.toRequest();
Iterator<GetFlowsResponse> responses = blockingStub.getFlows(request);
int count = 0;
while (responses.hasNext() && count < query.getLimit()) {
GetFlowsResponse response = responses.next();
flows.add(response.getFlow());
count++;
}
return flows;
}
/**
* Get service dependency graph
*/
public ServiceGraph getServiceGraph() {
GetNodesRequest request = GetNodesRequest.newBuilder().build();
GetNodesResponse response = blockingStub.getNodes(request);
ServiceGraph graph = new ServiceGraph();
for (Node node : response.getNodesList()) {
graph.addNode(node);
}
return graph;
}
/**
* Get all monitored namespaces
*/
public List<String> getMonitoredNamespaces() {
GetNamespacesRequest request = GetNamespacesRequest.newBuilder().build();
GetNamespacesResponse response = blockingStub.getNamespaces(request);
return new ArrayList<>(response.getNamespacesList());
}
@Override
public void close() {
if (channel != null && !channel.isShutdown()) {
channel.shutdown();
try {
if (!channel.awaitTermination(5, TimeUnit.SECONDS)) {
channel.shutdownNow();
}
} catch (InterruptedException e) {
channel.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
}

Flow Processing and Analysis

package com.titliel.hubble;
import org.hubble.api.Flow;
import org.hubble.api.TCPFlags;
import org.hubble.api.IP;
import org.hubble.api.Ethernet;
import org.hubble.api.Service;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* Advanced flow processor for security and performance analysis
*/
public class TitlielFlowProcessor {
private final Map<String, FlowMetrics> serviceMetrics = new ConcurrentHashMap<>();
private final Map<String, SecurityAlert> securityAlerts = new ConcurrentHashMap<>();
private final Set<String> allowedServices = ConcurrentHashMap.newKeySet();
/**
* Process incoming flow for security analysis
*/
public SecurityAnalysis analyzeFlow(Flow flow) {
SecurityAnalysis analysis = new SecurityAnalysis(flow);
// Check for suspicious patterns
checkPortScanning(flow, analysis);
checkUnusualProtocols(flow, analysis);
checkDataExfiltration(flow, analysis);
checkPolicyViolations(flow, analysis);
// Update metrics
updateServiceMetrics(flow);
return analysis;
}
private void checkPortScanning(Flow flow, SecurityAnalysis analysis) {
// Detect rapid connection attempts to multiple ports
String source = getSourceIdentifier(flow);
FlowMetrics metrics = serviceMetrics.computeIfAbsent(source, k -> new FlowMetrics());
metrics.recordConnectionAttempt();
if (metrics.getConnectionAttemptsPerMinute() > 100) {
analysis.addAlert(SecurityAlert.portScanningAlert(flow, 
metrics.getConnectionAttemptsPerMinute()));
}
}
private void checkUnusualProtocols(Flow flow, SecurityAnalysis analysis) {
// Check for non-standard protocols or ports
if (flow.hasL4()) {
int destPort = getDestinationPort(flow);
String protocol = getProtocol(flow);
if (isSuspiciousPort(destPort) && !isAllowedService(flow)) {
analysis.addAlert(SecurityAlert.suspiciousProtocolAlert(flow, protocol, destPort));
}
}
}
private void checkDataExfiltration(Flow flow, SecurityAnalysis analysis) {
// Detect large data transfers to external IPs
if (flow.hasL4() && flow.getL4().hasTCP()) {
long bytesTransferred = getBytesTransferred(flow);
if (bytesTransferred > 100 * 1024 * 1024) { // 100MB threshold
if (isExternalDestination(flow)) {
analysis.addAlert(SecurityAlert.dataExfiltrationAlert(flow, bytesTransferred));
}
}
}
}
private void checkPolicyViolations(Flow flow, SecurityAnalysis analysis) {
// Check against network policies
if (!isAllowedCommunication(flow)) {
analysis.addAlert(SecurityAlert.policyViolationAlert(flow));
}
}
private void updateServiceMetrics(Flow flow) {
String serviceKey = getServiceKey(flow);
FlowMetrics metrics = serviceMetrics.computeIfAbsent(serviceKey, k -> new FlowMetrics());
metrics.recordFlow(flow);
metrics.updateLatency(calculateLatency(flow));
}
// Utility methods
private String getSourceIdentifier(Flow flow) {
return flow.getIp().getSource() + ":" + getSourcePort(flow);
}
private String getServiceKey(Flow flow) {
return flow.getSource().getService().getName() + "->" + 
flow.getDestination().getService().getName();
}
private boolean isExternalDestination(Flow flow) {
// Implement logic to check if destination is external
return flow.getIp().getDestination().startsWith("192.168.") || 
flow.getIp().getDestination().startsWith("10.");
}
private boolean isAllowedCommunication(Flow flow) {
// Implement policy check logic
return allowedServices.contains(getServiceKey(flow));
}
private int getDestinationPort(Flow flow) {
if (flow.hasL4() && flow.getL4().hasTCP()) {
return flow.getL4().getTCP().getDestinationPort();
}
return -1;
}
// Getters for flow analysis
public Map<String, FlowMetrics> getServiceMetrics() {
return Collections.unmodifiableMap(serviceMetrics);
}
public Collection<SecurityAlert> getSecurityAlerts() {
return Collections.unmodifiableCollection(securityAlerts.values());
}
}

Security Analysis Data Classes

package com.titliel.hubble;
import org.hubble.api.Flow;
import java.time.Instant;
import java.util.*;
/**
* Security analysis result for a flow
*/
public class SecurityAnalysis {
private final Flow flow;
private final Instant analyzedAt;
private final List<SecurityAlert> alerts;
private final RiskLevel riskLevel;
public SecurityAnalysis(Flow flow) {
this.flow = flow;
this.analyzedAt = Instant.now();
this.alerts = new ArrayList<>();
this.riskLevel = RiskLevel.LOW;
}
public void addAlert(SecurityAlert alert) {
this.alerts.add(alert);
// Recalculate risk level
this.riskLevel = calculateOverallRisk();
}
private RiskLevel calculateOverallRisk() {
return alerts.stream()
.map(SecurityAlert::getRiskLevel)
.max(Comparator.naturalOrder())
.orElse(RiskLevel.LOW);
}
// Getters
public Flow getFlow() { return flow; }
public Instant getAnalyzedAt() { return analyzedAt; }
public List<SecurityAlert> getAlerts() { return Collections.unmodifiableList(alerts); }
public RiskLevel getRiskLevel() { return riskLevel; }
public boolean hasAlerts() { return !alerts.isEmpty(); }
}
/**
* Security alert representation
*/
public class SecurityAlert {
public enum AlertType {
PORT_SCANNING, SUSPICIOUS_PROTOCOL, DATA_EXFILTRATION, POLICY_VIOLATION
}
private final String id;
private final AlertType type;
private final String description;
private final RiskLevel riskLevel;
private final Flow relatedFlow;
private final Instant detectedAt;
private final Map<String, Object> metadata;
public SecurityAlert(AlertType type, String description, RiskLevel riskLevel, Flow flow) {
this.id = UUID.randomUUID().toString();
this.type = type;
this.description = description;
this.riskLevel = riskLevel;
this.relatedFlow = flow;
this.detectedAt = Instant.now();
this.metadata = new HashMap<>();
}
// Factory methods for common alert types
public static SecurityAlert portScanningAlert(Flow flow, int attempts) {
SecurityAlert alert = new SecurityAlert(
AlertType.PORT_SCANNING,
String.format("Port scanning detected: %d attempts per minute", attempts),
RiskLevel.HIGH,
flow
);
alert.metadata.put("attempts", attempts);
return alert;
}
public static SecurityAlert suspiciousProtocolAlert(Flow flow, String protocol, int port) {
SecurityAlert alert = new SecurityAlert(
AlertType.SUSPICIOUS_PROTOCOL,
String.format("Suspicious protocol usage: %s on port %d", protocol, port),
RiskLevel.MEDIUM,
flow
);
alert.metadata.put("protocol", protocol);
alert.metadata.put("port", port);
return alert;
}
// Getters
public String getId() { return id; }
public AlertType getType() { return type; }
public String getDescription() { return description; }
public RiskLevel getRiskLevel() { return riskLevel; }
public Flow getRelatedFlow() { return relatedFlow; }
public Instant getDetectedAt() { return detectedAt; }
public Map<String, Object> getMetadata() { return Collections.unmodifiableMap(metadata); }
}
/**
* Risk level enumeration
*/
public enum RiskLevel {
LOW, MEDIUM, HIGH, CRITICAL
}
/**
* Flow metrics for performance monitoring
*/
public class FlowMetrics {
private long totalFlows = 0;
private long totalBytes = 0;
private long connectionAttempts = 0;
private double averageLatency = 0.0;
private final List<Double> recentLatencies = new ArrayList<>();
private final Instant created = Instant.now();
public void recordFlow(Flow flow) {
totalFlows++;
if (flow.hasL4() && flow.getL4().hasTCP()) {
totalBytes += flow.getL4().getTCP().getBytes();
}
}
public void recordConnectionAttempt() {
connectionAttempts++;
}
public void updateLatency(double latency) {
recentLatencies.add(latency);
// Keep only last 100 measurements
if (recentLatencies.size() > 100) {
recentLatencies.remove(0);
}
averageLatency = recentLatencies.stream().mapToDouble(Double::doubleValue).average().orElse(0.0);
}
public long getConnectionAttemptsPerMinute() {
long minutes = java.time.Duration.between(created, Instant.now()).toMinutes();
return minutes > 0 ? connectionAttempts / minutes : connectionAttempts;
}
// Getters
public long getTotalFlows() { return totalFlows; }
public long getTotalBytes() { return totalBytes; }
public double getAverageLatency() { return averageLatency; }
}

Hubble Monitoring Service

package com.titliel.hubble;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* Continuous Hubble monitoring service
*/
public class TitlielHubbleMonitor {
private final TitlielHubbleClient hubbleClient;
private final TitlielFlowProcessor flowProcessor;
private final ScheduledExecutorService scheduler;
private final Consumer<SecurityAlert> alertHandler;
private volatile boolean monitoring = false;
public TitlielHubbleMonitor(String hubbleServer, Consumer<SecurityAlert> alertHandler) {
this.hubbleClient = new TitlielHubbleClient(hubbleServer);
this.flowProcessor = new TitlielFlowProcessor();
this.scheduler = Executors.newScheduledThreadPool(2);
this.alertHandler = alertHandler;
}
/**
* Start continuous flow monitoring
*/
public void startMonitoring() {
if (monitoring) {
return;
}
monitoring = true;
// Start real-time flow streaming
hubbleClient.streamFlows(new FlowObserver() {
@Override
public void onFlow(Flow flow) {
SecurityAnalysis analysis = flowProcessor.analyzeFlow(flow);
if (analysis.hasAlerts()) {
for (SecurityAlert alert : analysis.getAlerts()) {
alertHandler.accept(alert);
}
}
}
@Override
public void onError(HubbleException error) {
System.err.println("Flow streaming error: " + error.getMessage());
// Implement retry logic
}
@Override
public void onCompleted() {
System.out.println("Flow streaming completed");
}
});
// Schedule periodic metrics collection
scheduler.scheduleAtFixedRate(this::collectMetrics, 1, 1, TimeUnit.MINUTES);
}
/**
* Stop monitoring
*/
public void stopMonitoring() {
monitoring = false;
scheduler.shutdown();
hubbleClient.close();
}
private void collectMetrics() {
try {
ServiceGraph graph = hubbleClient.getServiceGraph();
Map<String, FlowMetrics> metrics = flowProcessor.getServiceMetrics();
// Report metrics (could be sent to monitoring system)
System.out.println("=== Hubble Metrics Report ===");
metrics.forEach((service, metric) -> {
System.out.printf("Service: %s, Flows: %d, Avg Latency: %.2fms%n",
service, metric.getTotalFlows(), metric.getAverageLatency());
});
} catch (Exception e) {
System.err.println("Metrics collection error: " + e.getMessage());
}
}
/**
* Get current security posture summary
*/
public SecurityPosture getSecurityPosture() {
Collection<SecurityAlert> alerts = flowProcessor.getSecurityAlerts();
Map<String, FlowMetrics> metrics = flowProcessor.getServiceMetrics();
return new SecurityPosture(alerts, metrics);
}
}

Usage Example

package com.titliel.hubble.demo;
import com.titliel.hubble.*;
/**
* Demo application showing Titliel Hubble integration
*/
public class HubbleIntegrationDemo {
public static void main(String[] args) {
// Create Hubble monitor
TitlielHubbleMonitor monitor = new TitlielHubbleMonitor(
"hubble.local", 
HubbleIntegrationDemo::handleSecurityAlert
);
// Start monitoring
monitor.startMonitoring();
// Keep monitoring for 5 minutes
try {
Thread.sleep(300000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// Get security posture
SecurityPosture posture = monitor.getSecurityPosture();
System.out.println("Security Posture: " + posture.getOverallRisk());
// Stop monitoring
monitor.stopMonitoring();
}
private static void handleSecurityAlert(SecurityAlert alert) {
System.out.println("🚨 SECURITY ALERT: " + alert.getDescription());
System.out.println("   Risk Level: " + alert.getRiskLevel());
System.out.println("   Flow: " + alert.getRelatedFlow().getSource() + 
" -> " + alert.getRelatedFlow().getDestination());
// In production, this could:
// - Send to SIEM
// - Trigger automated response
// - Notify security team
// - Create JIRA ticket
}
}

Maven Dependencies

<dependencies>
<!-- Hubble API Protocol Buffers -->
<dependency>
<groupId>com.github.cilium</groupId>
<artifactId>hubble-api</artifactId>
<version>0.11.0</version>
</dependency>
<!-- gRPC dependencies -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>1.54.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.54.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.54.0</version>
</dependency>
<!-- Utilities -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.0</version>
</dependency>
</dependencies>

Key Features

  1. Real-time Flow Monitoring: Stream and analyze network flows in real-time
  2. Security Analytics: Detect threats like port scanning and data exfiltration
  3. Performance Metrics: Track latency and throughput between services
  4. Service Dependency Mapping: Automatically discover service relationships
  5. Alerting Integration: Integrate with existing security and monitoring systems

Best Practices

  • Run Hubble monitoring as a sidecar in your Kubernetes cluster
  • Implement proper credential management for Hubble API access
  • Use flow sampling in high-traffic environments
  • Integrate with your existing SIEM and alerting systems
  • Regularly update detection rules based on new threat intelligence

This Titliel Hubble integration provides enterprise-grade network observability and security monitoring for your Java applications in cloud-native environments.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper