Building Real-Time Data Pipelines: Kafka Producer and Consumer in Java

Apache Kafka has become the de facto standard for building real-time data pipelines and streaming applications. At the heart of Kafka are Producers (that publish data to topics) and Consumers (that read data from topics). This article provides a comprehensive guide to implementing both in Java, covering configuration, best practices, and real-world examples.


Kafka Core Concepts

Before diving into code, let's review key Kafka concepts:

  • Topic: A category/feed name to which records are published
  • Partition: Topics are split into partitions for parallelism and scalability
  • Producer: Applications that publish (write) data to Kafka topics
  • Consumer: Applications that subscribe to (read) data from topics
  • Broker: A Kafka server that stores data and serves clients
  • Cluster: Multiple brokers working together
  • Consumer Group: Multiple consumers working together to consume topics

Project Setup

First, add the Kafka client dependency:

Maven:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>

Gradle:

implementation 'org.apache.kafka:kafka-clients:3.6.1'

Kafka Producer Implementation

The Kafka Producer sends records to Kafka topics. Here's how to implement it:

Basic Producer Setup:

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args) {
// Configure producer properties
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
// Create the producer
try (Producer<String, String> producer = new KafkaProducer<>(properties)) {
// Create a producer record
ProducerRecord<String, String> record = 
new ProducerRecord<>("test-topic", "key1", "Hello, Kafka!");
// Send the record asynchronously
producer.send(record);
System.out.println("Message sent successfully");
}
}
}

Advanced Producer with Callbacks:

import org.apache.kafka.clients.producer.*;
public class AdvancedProducer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
// Optional: Tune for reliability/performance
properties.put(ProducerConfig.ACKS_CONFIG, "all"); // Wait for all replicas
properties.put(ProducerConfig.RETRIES_CONFIG, 3); // Retry on failures
properties.put(ProducerConfig.LINGER_MS_CONFIG, 10); // Batch for 10ms
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 16KB batches
try (Producer<String, String> producer = new KafkaProducer<>(properties)) {
for (int i = 0; i < 10; i++) {
String key = "key-" + i;
String value = "Message " + i + " at " + System.currentTimeMillis();
ProducerRecord<String, String> record = 
new ProducerRecord<>("test-topic", key, value);
// Send with callback to handle response
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.printf("Message sent successfully! Topic: %s, " +
"Partition: %d, Offset: %d, Key: %s%n",
metadata.topic(), metadata.partition(), 
metadata.offset(), key);
} else {
System.err.println("Failed to send message: " + exception.getMessage());
}
}
});
}
// Flush to ensure all messages are sent
producer.flush();
}
}
}

Producer with Custom Objects:

import com.fasterxml.jackson.databind.ObjectMapper;
// Custom data class
class UserEvent {
private String userId;
private String action;
private long timestamp;
// Constructors, getters, setters
public UserEvent(String userId, String action, long timestamp) {
this.userId = userId;
this.action = action;
this.timestamp = timestamp;
}
// Getters and setters...
public String getUserId() { return userId; }
public String getAction() { return action; }
public long getTimestamp() { return timestamp; }
}
public class ObjectProducer {
private static final ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
try (Producer<String, String> producer = new KafkaProducer<>(properties)) {
UserEvent event = new UserEvent("user123", "login", System.currentTimeMillis());
String eventJson = objectMapper.writeValueAsString(event);
ProducerRecord<String, String> record = 
new ProducerRecord<>("user-events", event.getUserId(), eventJson);
producer.send(record);
System.out.println("User event sent: " + eventJson);
}
}
}

Kafka Consumer Implementation

The Kafka Consumer reads records from Kafka topics:

Basic Consumer:

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args) {
// Configure consumer properties
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Or "latest"
try (Consumer<String, String> consumer = new KafkaConsumer<>(properties)) {
// Subscribe to topic
consumer.subscribe(Collections.singletonList("test-topic"));
// Poll for new messages
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received message: Key = %s, Value = %s, " +
"Partition = %d, Offset = %d%n",
record.key(), record.value(), 
record.partition(), record.offset());
}
}
}
}
}

Advanced Consumer with Manual Offset Control:

import org.apache.kafka.clients.consumer.*;
import java.util.*;
public class ManualOffsetConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "manual-offset-group");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // Manual commit
try (Consumer<String, String> consumer = new KafkaConsumer<>(properties)) {
consumer.subscribe(Collections.singletonList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Process the record
System.out.printf("Processing: Key = %s, Value = %s%n", 
record.key(), record.value());
// Simulate processing
processMessage(record);
}
// Manually commit offsets after processing
consumer.commitSync();
System.out.println("Offsets committed");
}
}
}
private static void processMessage(ConsumerRecord<String, String> record) {
// Add your business logic here
try {
Thread.sleep(100); // Simulate processing time
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

Consumer with Custom Object Deserialization:

public class ObjectConsumer {
private static final ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "user-events-consumer");
try (Consumer<String, String> consumer = new KafkaConsumer<>(properties)) {
consumer.subscribe(Collections.singletonList("user-events"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
UserEvent event = objectMapper.readValue(record.value(), UserEvent.class);
System.out.printf("Processed user event: UserId = %s, Action = %s%n",
event.getUserId(), event.getAction());
} catch (Exception e) {
System.err.println("Failed to parse message: " + e.getMessage());
}
}
}
}
}
}

Complete Producer-Consumer Example

Here's a complete example showing both producer and consumer working together:

Event Producer:

public class EventProducer {
private final Producer<String, String> producer;
private final String topic;
public EventProducer(String bootstrapServers, String topic) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.ACKS_CONFIG, "all");
this.producer = new KafkaProducer<>(props);
this.topic = topic;
}
public void sendEvent(String key, String eventType, String data) {
String value = String.format("{\"type\":\"%s\",\"data\":\"%s\",\"timestamp\":%d}",
eventType, data, System.currentTimeMillis());
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Failed to send event: " + exception.getMessage());
} else {
System.out.printf("Event sent: Key=%s, Partition=%d, Offset=%d%n",
key, metadata.partition(), metadata.offset());
}
});
}
public void close() {
producer.close();
}
}

Event Consumer:

public class EventConsumer {
private final Consumer<String, String> consumer;
private final String topic;
private volatile boolean running = true;
public EventConsumer(String bootstrapServers, String topic, String groupId) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
this.consumer = new KafkaConsumer<>(props);
this.topic = topic;
}
public void startConsuming() {
consumer.subscribe(Collections.singletonList(topic));
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processEvent(record.key(), record.value());
}
}
consumer.close();
}
private void processEvent(String key, String value) {
System.out.printf("Processing event - Key: %s, Value: %s%n", key, value);
// Add business logic here
}
public void stop() {
running = false;
}
}

Main Application:

public class KafkaApplication {
public static void main(String[] args) throws InterruptedException {
String bootstrapServers = "localhost:9092";
String topic = "events";
// Start consumer in separate thread
EventConsumer consumer = new EventConsumer(bootstrapServers, topic, "event-processors");
Thread consumerThread = new Thread(consumer::startConsuming);
consumerThread.start();
// Produce events
EventProducer producer = new EventProducer(bootstrapServers, topic);
for (int i = 0; i < 10; i++) {
producer.sendEvent("user-" + i, "click", "button-" + (i % 3));
Thread.sleep(1000);
}
producer.close();
Thread.sleep(5000);
consumer.stop();
consumerThread.join();
}
}

Key Configuration Parameters

Important Producer Configs:

  • bootstrap.servers: Kafka broker addresses
  • key.serializer: Serializer for keys
  • value.serializer: Serializer for values
  • acks: Reliability setting (0, 1, or all)
  • retries: Number of retries on failure
  • batch.size: Batch size in bytes
  • linger.ms: Time to wait for batching

Important Consumer Configs:

  • bootstrap.servers: Kafka broker addresses
  • key.deserializer: Deserializer for keys
  • value.deserializer: Deserializer for values
  • group.id: Consumer group identifier
  • auto.offset.reset: What to do when no offset exists (earliest/latest)
  • enable.auto.commit: Whether to auto-commit offsets
  • max.poll.records: Maximum records per poll

Best Practices

  1. Producer:
  • Use appropriate acks for your reliability needs
  • Implement retry logic with backoff
  • Use batching for better throughput
  • Always close producers properly
  1. Consumer:
  • Choose between auto and manual offset committing based on needs
  • Handle rebalance scenarios properly
  • Process records in batches when possible
  • Monitor consumer lag
  1. General:
  • Use meaningful key values for partitioning
  • Monitor producer and consumer metrics
  • Test failure scenarios
  • Use schema evolution strategies (Avro, Protobuf)

Error Handling and Resilience

Producer with Error Handling:

public class ResilientProducer {
public void sendWithRetry(Producer<String, String> producer, 
ProducerRecord<String, String> record, 
int maxRetries) {
int attempt = 0;
while (attempt < maxRetries) {
try {
producer.send(record).get(); // Wait for completion
return; // Success
} catch (Exception e) {
attempt++;
System.err.printf("Attempt %d failed: %s%n", attempt, e.getMessage());
if (attempt >= maxRetries) {
throw new RuntimeException("Failed after " + maxRetries + " attempts", e);
}
try {
Thread.sleep(100 * attempt); // Exponential backoff
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted during retry", ie);
}
}
}
}
}

Conclusion

Kafka Producers and Consumers in Java provide powerful capabilities for building real-time data pipelines. Key takeaways:

  • Producers publish data to topics with configurable reliability guarantees
  • Consumers read data in consumer groups for parallel processing
  • Proper configuration is crucial for performance and reliability
  • Error handling and monitoring are essential for production systems
  • Schema management becomes important as systems evolve

By mastering these concepts and following best practices, you can build robust, scalable, and maintainable event-driven architectures with Kafka in Java.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper