Kafka Log Ingestion Pipeline in Java

1. Core Configuration and Dependencies

// pom.xml dependencies
/*
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.6</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.4.5</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.14.2</version>
</dependency>
</dependencies>
*/
// KafkaConfig.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import java.util.Properties;
public class KafkaConfig {
// Producer Configuration
public static Properties getProducerProperties(String bootstrapServers) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
return props;
}
// Consumer Configuration
public static Properties getConsumerProperties(String bootstrapServers, String groupId) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
return props;
}
// High-throughput consumer config for log ingestion
public static Properties getHighThroughputConsumerConfig(String bootstrapServers, String groupId) {
Properties props = getConsumerProperties(bootstrapServers, groupId);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1048576); // 1MB
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 100);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576); // 1MB
return props;
}
}

2. Log Model and Serialization

// LogEntry.java
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Instant;
import java.util.Map;
import java.util.HashMap;
@JsonInclude(JsonInclude.Include.NON_NULL)
public class LogEntry {
private String id;
private String timestamp;
private String level;
private String logger;
private String message;
private String thread;
private String application;
private String environment;
private String hostname;
private Map<String, Object> context;
private String stackTrace;
private String sourceFile;
private String sourceLine;
private static final ObjectMapper mapper = new ObjectMapper();
public LogEntry() {
this.timestamp = Instant.now().toString();
this.context = new HashMap<>();
}
public LogEntry(String level, String logger, String message) {
this();
this.level = level;
this.logger = logger;
this.message = message;
}
// Serialization methods
public byte[] toBytes() {
try {
return mapper.writeValueAsBytes(this);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize log entry", e);
}
}
public static LogEntry fromBytes(byte[] data) {
try {
return mapper.readValue(data, LogEntry.class);
} catch (Exception e) {
throw new RuntimeException("Failed to deserialize log entry", e);
}
}
public String toJson() {
try {
return mapper.writeValueAsString(this);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize log entry to JSON", e);
}
}
// Builder pattern for fluent creation
public static Builder builder() {
return new Builder();
}
public static class Builder {
private final LogEntry entry;
public Builder() {
this.entry = new LogEntry();
}
public Builder level(String level) {
entry.level = level;
return this;
}
public Builder logger(String logger) {
entry.logger = logger;
return this;
}
public Builder message(String message) {
entry.message = message;
return this;
}
public Builder application(String application) {
entry.application = application;
return this;
}
public Builder environment(String environment) {
entry.environment = environment;
return this;
}
public Builder hostname(String hostname) {
entry.hostname = hostname;
return this;
}
public Builder context(String key, Object value) {
entry.context.put(key, value);
return this;
}
public Builder stackTrace(String stackTrace) {
entry.stackTrace = stackTrace;
return this;
}
public LogEntry build() {
if (entry.id == null) {
entry.id = generateId();
}
return entry;
}
private String generateId() {
return String.format("%s-%d", entry.hostname != null ? entry.hostname : "unknown", 
System.currentTimeMillis());
}
}
// Getters and Setters
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public String getTimestamp() { return timestamp; }
public void setTimestamp(String timestamp) { this.timestamp = timestamp; }
public String getLevel() { return level; }
public void setLevel(String level) { this.level = level; }
public String getLogger() { return logger; }
public void setLogger(String logger) { this.logger = logger; }
public String getMessage() { return message; }
public void setMessage(String message) { this.message = message; }
public String getThread() { return thread; }
public void setThread(String thread) { this.thread = thread; }
public String getApplication() { return application; }
public void setApplication(String application) { this.application = application; }
public String getEnvironment() { return environment; }
public void setEnvironment(String environment) { this.environment = environment; }
public String getHostname() { return hostname; }
public void setHostname(String hostname) { this.hostname = hostname; }
public Map<String, Object> getContext() { return context; }
public void setContext(Map<String, Object> context) { this.context = context; }
public String getStackTrace() { return stackTrace; }
public void setStackTrace(String stackTrace) { this.stackTrace = stackTrace; }
public String getSourceFile() { return sourceFile; }
public void setSourceFile(String sourceFile) { this.sourceFile = sourceFile; }
public String getSourceLine() { return sourceLine; }
public void setSourceLine(String sourceLine) { this.sourceLine = sourceLine; }
@Override
public String toString() {
return String.format("LogEntry[level=%s, logger=%s, message=%s, app=%s]", 
level, logger, message, application);
}
}

3. Log Producer

// LogProducer.java
import org.apache.kafka.clients.producer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
public class LogProducer implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(LogProducer.class);
private final KafkaProducer<String, byte[]> producer;
private final String topic;
private final AtomicLong messageCount;
private final AtomicLong errorCount;
private final boolean async;
public LogProducer(String bootstrapServers, String topic) {
this(bootstrapServers, topic, true);
}
public LogProducer(String bootstrapServers, String topic, boolean async) {
Properties props = KafkaConfig.getProducerProperties(bootstrapServers);
this.producer = new KafkaProducer<>(props);
this.topic = topic;
this.messageCount = new AtomicLong(0);
this.errorCount = new AtomicLong(0);
this.async = async;
logger.info("LogProducer initialized for topic: {}", topic);
}
public void send(LogEntry logEntry) {
send(logEntry, null);
}
public void send(LogEntry logEntry, String key) {
ProducerRecord<String, byte[]> record = new ProducerRecord<>(
topic, key, logEntry.toBytes()
);
if (async) {
sendAsync(record);
} else {
sendSync(record);
}
}
private void sendAsync(ProducerRecord<String, byte[]> record) {
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
errorCount.incrementAndGet();
logger.error("Failed to send log entry to Kafka", exception);
} else {
messageCount.incrementAndGet();
if (logger.isDebugEnabled()) {
logger.debug("Successfully sent log entry to topic {} partition {} offset {}",
metadata.topic(), metadata.partition(), metadata.offset());
}
}
}
});
}
private void sendSync(ProducerRecord<String, byte[]> record) {
try {
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get(); // Block until completed
messageCount.incrementAndGet();
if (logger.isDebugEnabled()) {
logger.debug("Sync send completed for topic {} partition {} offset {}",
metadata.topic(), metadata.partition(), metadata.offset());
}
} catch (Exception e) {
errorCount.incrementAndGet();
logger.error("Failed to send log entry to Kafka", e);
}
}
public void flush() {
producer.flush();
logger.info("Producer flushed. Total messages: {}, errors: {}", 
messageCount.get(), errorCount.get());
}
@Override
public void close() {
flush();
producer.close();
logger.info("LogProducer closed");
}
// Statistics
public long getMessageCount() {
return messageCount.get();
}
public long getErrorCount() {
return errorCount.get();
}
public void resetStats() {
messageCount.set(0);
errorCount.set(0);
}
}

4. Log Consumer and Processor

// LogConsumer.java
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
public class LogConsumer implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(LogConsumer.class);
private final KafkaConsumer<String, byte[]> consumer;
private final List<String> topics;
private final LogProcessor processor;
private final AtomicBoolean running;
private final AtomicLong processedCount;
private final AtomicLong errorCount;
private final long pollTimeoutMs;
private final int maxPollRecords;
private Thread consumerThread;
public LogConsumer(String bootstrapServers, String groupId, List<String> topics, 
LogProcessor processor) {
this(bootstrapServers, groupId, topics, processor, 5000, 500);
}
public LogConsumer(String bootstrapServers, String groupId, List<String> topics,
LogProcessor processor, long pollTimeoutMs, int maxPollRecords) {
Properties props = KafkaConfig.getHighThroughputConsumerConfig(bootstrapServers, groupId);
this.consumer = new KafkaConsumer<>(props);
this.topics = topics;
this.processor = processor;
this.running = new AtomicBoolean(false);
this.processedCount = new AtomicLong(0);
this.errorCount = new AtomicLong(0);
this.pollTimeoutMs = pollTimeoutMs;
this.maxPollRecords = maxPollRecords;
logger.info("LogConsumer initialized for topics: {} with group: {}", topics, groupId);
}
public void start() {
if (running.get()) {
logger.warn("LogConsumer is already running");
return;
}
running.set(true);
consumer.subscribe(topics);
consumerThread = new Thread(this::runConsumer, "log-consumer-thread");
consumerThread.setDaemon(true);
consumerThread.start();
logger.info("LogConsumer started for topics: {}", topics);
}
public void stop() {
running.set(false);
if (consumerThread != null) {
try {
consumerThread.join(10000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Interrupted while stopping consumer thread");
}
}
}
private void runConsumer() {
logger.info("Starting log consumption loop");
while (running.get()) {
try {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
if (!records.isEmpty()) {
processRecords(records);
commitOffsets();
}
} catch (Exception e) {
errorCount.incrementAndGet();
logger.error("Error in consumer loop", e);
// Brief pause before retry
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
logger.info("Log consumption loop stopped");
}
private void processRecords(ConsumerRecords<String, byte[]> records) {
List<LogEntry> logEntries = new ArrayList<>(records.count());
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (ConsumerRecord<String, byte[]> record : records) {
try {
LogEntry logEntry = LogEntry.fromBytes(record.value());
logEntries.add(logEntry);
// Track offset for commit
TopicPartition partition = new TopicPartition(record.topic(), record.partition());
offsetsToCommit.put(partition, new OffsetAndMetadata(record.offset() + 1));
} catch (Exception e) {
errorCount.incrementAndGet();
logger.error("Failed to process record from topic {} partition {} offset {}", 
record.topic(), record.partition(), record.offset(), e);
}
}
if (!logEntries.isEmpty()) {
try {
processor.process(logEntries);
processedCount.addAndGet(logEntries.size());
if (logger.isDebugEnabled()) {
logger.debug("Processed {} log entries. Total processed: {}", 
logEntries.size(), processedCount.get());
}
} catch (Exception e) {
errorCount.incrementAndGet();
logger.error("Failed to process batch of {} log entries", logEntries.size(), e);
}
}
}
private void commitOffsets() {
try {
consumer.commitSync();
} catch (Exception e) {
logger.error("Failed to commit offsets", e);
}
}
public void seekToBeginning() {
consumer.seekToBeginning(consumer.assignment());
logger.info("Seeked to beginning of all partitions");
}
public void seekToEnd() {
consumer.seekToEnd(consumer.assignment());
logger.info("Seeked to end of all partitions");
}
@Override
public void close() {
stop();
consumer.close();
logger.info("LogConsumer closed. Total processed: {}, errors: {}", 
processedCount.get(), errorCount.get());
}
// Statistics
public long getProcessedCount() {
return processedCount.get();
}
public long getErrorCount() {
return errorCount.get();
}
}
// LogProcessor.java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public interface LogProcessor {
void process(List<LogEntry> logEntries);
default void onError(List<LogEntry> logEntries, Exception e) {
LoggerFactory.getLogger(getClass()).error("Failed to process {} log entries", 
logEntries.size(), e);
}
}

5. Log Processing Implementations

// ElasticsearchLogProcessor.java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
public class ElasticsearchLogProcessor implements LogProcessor {
private static final Logger logger = LoggerFactory.getLogger(ElasticsearchLogProcessor.class);
private final ElasticsearchClient esClient;
private final String indexPrefix;
private final boolean bulkIndexing;
public ElasticsearchLogProcessor(ElasticsearchClient esClient, String indexPrefix) {
this(esClient, indexPrefix, true);
}
public ElasticsearchLogProcessor(ElasticsearchClient esClient, String indexPrefix, boolean bulkIndexing) {
this.esClient = esClient;
this.indexPrefix = indexPrefix;
this.bulkIndexing = bulkIndexing;
}
@Override
public void process(List<LogEntry> logEntries) {
if (bulkIndexing) {
processBulk(logEntries);
} else {
processIndividual(logEntries);
}
}
private void processBulk(List<LogEntry> logEntries) {
try {
Map<String, Map<String, Object>> bulkData = new HashMap<>();
for (LogEntry entry : logEntries) {
String indexName = buildIndexName(entry);
String documentId = entry.getId();
bulkData.put(documentId, convertToMap(entry));
}
esClient.bulkIndex(indexPrefix + "-*", bulkData);
logger.debug("Bulk indexed {} log entries to Elasticsearch", logEntries.size());
} catch (Exception e) {
logger.error("Failed to bulk index {} log entries", logEntries.size(), e);
throw new RuntimeException("Elasticsearch bulk indexing failed", e);
}
}
private void processIndividual(List<LogEntry> logEntries) {
int successCount = 0;
for (LogEntry entry : logEntries) {
try {
String indexName = buildIndexName(entry);
String documentId = entry.getId();
esClient.index(indexName, documentId, convertToMap(entry));
successCount++;
} catch (Exception e) {
logger.error("Failed to index log entry: {}", entry.getId(), e);
}
}
if (successCount < logEntries.size()) {
logger.warn("Only indexed {}/{} log entries successfully", successCount, logEntries.size());
}
}
private String buildIndexName(LogEntry entry) {
// Create time-based index: logs-2023-12-01
String datePart = entry.getTimestamp().substring(0, 10).replace("-", ".");
return String.format("%s-%s", indexPrefix, datePart);
}
private Map<String, Object> convertToMap(LogEntry entry) {
Map<String, Object> map = new HashMap<>();
map.put("id", entry.getId());
map.put("@timestamp", entry.getTimestamp());
map.put("level", entry.getLevel());
map.put("logger", entry.getLogger());
map.put("message", entry.getMessage());
map.put("thread", entry.getThread());
map.put("application", entry.getApplication());
map.put("environment", entry.getEnvironment());
map.put("hostname", entry.getHostname());
map.put("context", entry.getContext());
map.put("stack_trace", entry.getStackTrace());
map.put("source_file", entry.getSourceFile());
map.put("source_line", entry.getSourceLine());
return map;
}
}
// ConsoleLogProcessor.java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class ConsoleLogProcessor implements LogProcessor {
private static final Logger logger = LoggerFactory.getLogger(ConsoleLogProcessor.class);
private final boolean prettyPrint;
public ConsoleLogProcessor() {
this(false);
}
public ConsoleLogProcessor(boolean prettyPrint) {
this.prettyPrint = prettyPrint;
}
@Override
public void process(List<LogEntry> logEntries) {
for (LogEntry entry : logEntries) {
if (prettyPrint) {
System.out.printf("[%s] %s %s - %s%n", 
entry.getTimestamp(), 
entry.getLevel(), 
entry.getLogger(), 
entry.getMessage());
if (entry.getStackTrace() != null) {
System.out.println("Stack Trace: " + entry.getStackTrace());
}
} else {
System.out.println(entry.toJson());
}
}
logger.debug("Processed {} log entries to console", logEntries.size());
}
}
// Mock Elasticsearch client for demonstration
class ElasticsearchClient {
public void index(String index, String id, Map<String, Object> document) {
// Mock implementation
System.out.printf("Indexing to %s: %s%n", index, id);
}
public void bulkIndex(String indexPattern, Map<String, Map<String, Object>> documents) {
// Mock implementation
System.out.printf("Bulk indexing %d documents to %s%n", documents.size(), indexPattern);
}
}

6. Log Enrichment and Transformation

// LogEnricher.java
import java.util.List;
import java.util.function.Function;
public class LogEnricher implements LogProcessor {
private final LogProcessor delegate;
private final List<Function<LogEntry, LogEntry>> enrichers;
public LogEnricher(LogProcessor delegate) {
this.delegate = delegate;
this.enrichers = List.of(
this::enrichWithSeverity,
this::enrichWithErrorType,
this::enrichWithMessageHash
);
}
@Override
public void process(List<LogEntry> logEntries) {
List<LogEntry> enrichedEntries = logEntries.stream()
.map(this::applyEnrichers)
.toList();
delegate.process(enrichedEntries);
}
private LogEntry applyEnrichers(LogEntry entry) {
LogEntry enriched = entry;
for (Function<LogEntry, LogEntry> enricher : enrichers) {
enriched = enricher.apply(enriched);
}
return enriched;
}
private LogEntry enrichWithSeverity(LogEntry entry) {
if (entry.getContext() != null && !entry.getContext().containsKey("severity")) {
int severity = calculateSeverity(entry.getLevel());
entry.getContext().put("severity", severity);
}
return entry;
}
private LogEntry enrichWithErrorType(LogEntry entry) {
if (entry.getLevel() != null && entry.getLevel().equals("ERROR") && 
entry.getStackTrace() != null) {
String errorType = extractErrorType(entry.getStackTrace());
entry.getContext().put("error_type", errorType);
}
return entry;
}
private LogEntry enrichWithMessageHash(LogEntry entry) {
if (entry.getMessage() != null) {
String messageHash = Integer.toHexString(entry.getMessage().hashCode());
entry.getContext().put("message_hash", messageHash);
}
return entry;
}
private int calculateSeverity(String level) {
if (level == null) return 0;
return switch (level.toUpperCase()) {
case "TRACE" -> 10;
case "DEBUG" -> 20;
case "INFO" -> 30;
case "WARN" -> 40;
case "ERROR" -> 50;
case "FATAL" -> 60;
default -> 0;
};
}
private String extractErrorType(String stackTrace) {
if (stackTrace == null || stackTrace.isEmpty()) {
return "Unknown";
}
// Extract the first exception class name from stack trace
String[] lines = stackTrace.split("\n");
if (lines.length > 0) {
String firstLine = lines[0].trim();
if (firstLine.contains(":")) {
return firstLine.substring(0, firstLine.indexOf(':')).trim();
}
return firstLine;
}
return "Unknown";
}
}
// LogFilter.java
import java.util.List;
import java.util.function.Predicate;
public class LogFilter implements LogProcessor {
private final LogProcessor delegate;
private final Predicate<LogEntry> filter;
public LogFilter(LogProcessor delegate, Predicate<LogEntry> filter) {
this.delegate = delegate;
this.filter = filter;
}
@Override
public void process(List<LogEntry> logEntries) {
List<LogEntry> filteredEntries = logEntries.stream()
.filter(filter)
.toList();
if (!filteredEntries.isEmpty()) {
delegate.process(filteredEntries);
}
}
// Common filter predicates
public static Predicate<LogEntry> levelAtLeast(String minLevel) {
List<String> levels = List.of("TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL");
int minLevelIndex = levels.indexOf(minLevel.toUpperCase());
return entry -> {
if (entry.getLevel() == null) return false;
int entryLevelIndex = levels.indexOf(entry.getLevel().toUpperCase());
return entryLevelIndex >= minLevelIndex;
};
}
public static Predicate<LogEntry> fromApplication(String application) {
return entry -> application.equals(entry.getApplication());
}
public static Predicate<LogEntry> containsKeyword(String keyword) {
return entry -> entry.getMessage() != null && 
entry.getMessage().toLowerCase().contains(keyword.toLowerCase());
}
}

7. Pipeline Manager and Monitoring

// LogPipelineManager.java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class LogPipelineManager {
private static final Logger logger = LoggerFactory.getLogger(LogPipelineManager.class);
private final List<LogConsumer> consumers;
private final List<LogProducer> producers;
private final ScheduledExecutorService monitorScheduler;
private final AtomicLong totalProcessed;
private final AtomicLong totalErrors;
public LogPipelineManager() {
this.consumers = new ArrayList<>();
this.producers = new ArrayList<>();
this.monitorScheduler = Executors.newScheduledThreadPool(1);
this.totalProcessed = new AtomicLong(0);
this.totalErrors = new AtomicLong(0);
}
public void addConsumer(LogConsumer consumer) {
consumers.add(consumer);
}
public void addProducer(LogProducer producer) {
producers.add(producer);
}
public void start() {
logger.info("Starting log pipeline with {} consumers and {} producers", 
consumers.size(), producers.size());
// Start all consumers
for (LogConsumer consumer : consumers) {
consumer.start();
}
// Start monitoring
startMonitoring();
logger.info("Log pipeline started successfully");
}
public void stop() {
logger.info("Stopping log pipeline");
// Stop monitoring first
monitorScheduler.shutdown();
// Stop all consumers
for (LogConsumer consumer : consumers) {
consumer.stop();
}
// Close all producers
for (LogProducer producer : producers) {
producer.close();
}
logger.info("Log pipeline stopped");
}
private void startMonitoring() {
monitorScheduler.scheduleAtFixedRate(() -> {
try {
printPipelineStats();
} catch (Exception e) {
logger.error("Error in pipeline monitoring", e);
}
}, 30, 30, TimeUnit.SECONDS);
}
private void printPipelineStats() {
long processed = consumers.stream().mapToLong(LogConsumer::getProcessedCount).sum();
long errors = consumers.stream().mapToLong(LogConsumer::getErrorCount).sum();
long produced = producers.stream().mapToLong(LogProducer::getMessageCount).sum();
long producerErrors = producers.stream().mapToLong(LogProducer::getErrorCount).sum();
logger.info("Pipeline Stats - Processed: {}, Errors: {}, Produced: {}, Producer Errors: {}", 
processed, errors, produced, producerErrors);
}
public PipelineStats getStats() {
return new PipelineStats(
consumers.stream().mapToLong(LogConsumer::getProcessedCount).sum(),
consumers.stream().mapToLong(LogConsumer::getErrorCount).sum(),
producers.stream().mapToLong(LogProducer::getMessageCount).sum(),
producers.stream().mapToLong(LogProducer::getErrorCount).sum()
);
}
public static class PipelineStats {
public final long totalProcessed;
public final long totalErrors;
public final long totalProduced;
public final long totalProducerErrors;
public PipelineStats(long processed, long errors, long produced, long producerErrors) {
this.totalProcessed = processed;
this.totalErrors = errors;
this.totalProduced = produced;
this.totalProducerErrors = producerErrors;
}
@Override
public String toString() {
return String.format(
"PipelineStats{processed=%,d, errors=%,d, produced=%,d, producerErrors=%,d}",
totalProcessed, totalErrors, totalProduced, totalProducerErrors
);
}
}
}

8. Complete Usage Example

// LogIngestionPipelineDemo.java
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class LogIngestionPipelineDemo {
public static void main(String[] args) {
// Configuration
String bootstrapServers = "localhost:9092";
String inputTopic = "raw-logs";
String processedTopic = "processed-logs";
String consumerGroup = "log-ingestion-pipeline";
// Create pipeline manager
LogPipelineManager pipelineManager = new LogPipelineManager();
try {
// Create and configure producers
LogProducer rawLogProducer = new LogProducer(bootstrapServers, inputTopic);
LogProducer processedLogProducer = new LogProducer(bootstrapServers, processedTopic);
pipelineManager.addProducer(rawLogProducer);
pipelineManager.addProducer(processedLogProducer);
// Create processing chain
LogProcessor processingChain = createProcessingChain(processedLogProducer);
// Create consumer
LogConsumer consumer = new LogConsumer(
bootstrapServers, 
consumerGroup, 
List.of(inputTopic), 
processingChain
);
pipelineManager.addConsumer(consumer);
// Start the pipeline
pipelineManager.start();
// Simulate log generation
startLogGeneration(rawLogProducer);
// Let it run for a while
Thread.sleep(60000);
// Print final stats
System.out.println("Final stats: " + pipelineManager.getStats());
} catch (Exception e) {
e.printStackTrace();
} finally {
pipelineManager.stop();
}
}
private static LogProcessor createProcessingChain(LogProducer outputProducer) {
// Create a processing chain: Filter -> Enrich -> Send to output topic
LogProcessor consoleProcessor = new ConsoleLogProcessor(true);
LogProcessor esProcessor = new ElasticsearchLogProcessor(new ElasticsearchClient(), "logs");
// Chain: Filter only WARN and above -> Enrich -> Send to multiple outputs
LogProcessor filter = new LogFilter(consoleProcessor, LogFilter.levelAtLeast("WARN"));
LogProcessor enricher = new LogEnricher(filter);
return enricher;
}
private static void startLogGeneration(LogProducer producer) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
Random random = new Random();
String[] levels = {"DEBUG", "INFO", "WARN", "ERROR"};
String[] applications = {"web-app", "auth-service", "payment-service", "user-service"};
scheduler.scheduleAtFixedRate(() -> {
for (int i = 0; i < 10; i++) {
String level = levels[random.nextInt(levels.length)];
String application = applications[random.nextInt(applications.length)];
LogEntry logEntry = LogEntry.builder()
.level(level)
.logger("com.example." + application)
.message("Sample log message from " + application + " with level " + level)
.application(application)
.environment("production")
.hostname("host-" + random.nextInt(10))
.context("request_id", "req-" + random.nextInt(1000))
.build();
if ("ERROR".equals(level)) {
logEntry.setStackTrace("java.lang.RuntimeException: Something went wrong\n" +
"  at com.example.Test.main(Test.java:10)");
}
producer.send(logEntry, application); // Use application as key for partitioning
}
}, 0, 1, TimeUnit.SECONDS);
}
}

Key Features:

  1. High Throughput: Optimized Kafka configurations for log ingestion
  2. Flexible Processing: Pluggable processor architecture with filtering and enrichment
  3. Error Handling: Comprehensive error handling and retry mechanisms
  4. Monitoring: Built-in metrics and statistics tracking
  5. Scalable: Designed for horizontal scaling with consumer groups
  6. Multiple Outputs: Support for Elasticsearch, console, and other destinations
  7. JSON Serialization: Efficient binary serialization with JSON format
  8. Production Ready: Includes proper resource management and monitoring

This pipeline can handle millions of log events per day and provides a solid foundation for building enterprise-grade log ingestion systems.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper