Overview
Spark Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. It provides a high-level API for continuous processing of structured data streams.
Key Concepts
- Input Sources: Kafka, File, Socket, etc.
- Output Sinks: Console, Kafka, File, Memory, etc.
- Output Modes: Append, Update, Complete
- Trigger Types: Processing time, Once, Continuous
- Watermarks: For handling late data
- Checkpointing: For fault tolerance
Basic Setup
1. Maven Dependencies
<properties>
<spark.version>3.4.0</spark.version>
<scala.version>2.12</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
2. Basic Streaming Application
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;
import static org.apache.spark.sql.functions.*;
public class BasicStructuredStreaming {
public static void main(String[] args) throws StreamingQueryException {
// Create Spark Session
SparkSession spark = SparkSession.builder()
.appName("BasicStructuredStreaming")
.master("local[2]")
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate();
// Create streaming DataFrame representing input from socket
Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load();
// Split the lines into words
Dataset<Row> words = lines
.select(explode(split(col("value"), " ")).as("word"));
// Generate running word count
Dataset<Row> wordCounts = words.groupBy("word").count();
// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts
.writeStream()
.outputMode("complete")
.format("console")
.trigger(Trigger.ProcessingTime("1 second"))
.start();
query.awaitTermination();
}
}
Input Sources
1. Socket Source
public class SocketStreamExample {
public static void main(String[] args) throws StreamingQueryException {
SparkSession spark = SparkSession.builder()
.appName("SocketStreamExample")
.master("local[2]")
.getOrCreate();
// Read from socket
Dataset<Row> socketDF = spark
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load();
// Process the data
Dataset<Row> processedDF = socketDF
.withColumn("timestamp", current_timestamp())
.withColumn("length", length(col("value")))
.withColumn("words", split(col("value"), " "))
.withColumn("word_count", size(col("words")));
// Write to console
StreamingQuery query = processedDF
.writeStream()
.outputMode("append")
.format("console")
.option("truncate", "false")
.start();
query.awaitTermination();
}
}
2. File Source
public class FileStreamExample {
public static void main(String[] args) throws StreamingQueryException {
SparkSession spark = SparkSession.builder()
.appName("FileStreamExample")
.master("local[2]")
.getOrCreate();
// Define schema for JSON files
String schema = "timestamp timestamp, user_id string, action string, value double";
// Read JSON files from directory
Dataset<Row> fileDF = spark
.readStream()
.schema(schema)
.json("data/streaming/input/");
// Process the data
Dataset<Row> processedDF = fileDF
.withWatermark("timestamp", "10 minutes")
.groupBy(
window(col("timestamp"), "5 minutes"),
col("action")
)
.agg(
count("*").as("event_count"),
sum("value").as("total_value"),
avg("value").as("average_value")
)
.select(
col("window.start").as("window_start"),
col("window.end").as("window_end"),
col("action"),
col("event_count"),
col("total_value"),
col("average_value")
);
// Write to console
StreamingQuery query = processedDF
.writeStream()
.outputMode("update")
.format("console")
.option("truncate", "false")
.start();
query.awaitTermination();
}
}
3. Kafka Source
public class KafkaStreamExample {
public static void main(String[] args) throws StreamingQueryException {
SparkSession spark = SparkSession.builder()
.appName("KafkaStreamExample")
.master("local[2]")
.getOrCreate();
// Read from Kafka
Dataset<Row> kafkaDF = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "user-events")
.option("startingOffsets", "latest")
.load();
// Parse JSON from Kafka value
Dataset<Row> parsedDF = kafkaDF
.select(
col("key").cast("string"),
col("value").cast("string"),
col("topic"),
col("partition"),
col("offset"),
col("timestamp")
)
.filter(col("value").isNotNull())
.select(
col("key"),
from_json(col("value"),
"user_id string, event_type string, timestamp timestamp, data string"
).as("data"),
col("topic"),
col("partition"),
col("offset"),
col("timestamp")
)
.select(
col("key"),
col("data.user_id"),
col("data.event_type"),
col("data.timestamp"),
col("data.data"),
col("topic"),
col("partition"),
col("offset"),
col("timestamp").as("kafka_timestamp")
);
// Process events with watermarks
Dataset<Row> processedDF = parsedDF
.withWatermark("timestamp", "1 minute")
.groupBy(
window(col("timestamp"), "5 minutes", "1 minute"),
col("event_type")
)
.agg(
count("*").as("event_count"),
countDistinct("user_id").as("unique_users")
)
.select(
col("window.start").as("window_start"),
col("window.end").as("window_end"),
col("event_type"),
col("event_count"),
col("unique_users")
);
// Write to console
StreamingQuery query = processedDF
.writeStream()
.outputMode("update")
.format("console")
.option("truncate", "false")
.start();
query.awaitTermination();
}
}
Output Sinks
1. Console Sink
public class ConsoleSinkExample {
public static void main(String[] args) throws StreamingQueryException {
SparkSession spark = SparkSession.builder()
.appName("ConsoleSinkExample")
.master("local[2]")
.getOrCreate();
Dataset<Row> streamDF = spark
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load();
Dataset<Row> resultDF = streamDF
.groupBy(col("value"))
.count();
// Different output modes
StreamingQuery appendQuery = resultDF
.writeStream()
.outputMode("append")
.format("console")
.start();
StreamingQuery completeQuery = resultDF
.writeStream()
.outputMode("complete")
.format("console")
.start();
StreamingQuery updateQuery = resultDF
.writeStream()
.outputMode("update")
.format("console")
.start();
}
}
2. File Sink
public class FileSinkExample {
public static void main(String[] args) throws StreamingQueryException {
SparkSession spark = SparkSession.builder()
.appName("FileSinkExample")
.master("local[2]")
.getOrCreate();
Dataset<Row> streamDF = spark
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load();
Dataset<Row> resultDF = streamDF
.withColumn("timestamp", current_timestamp())
.withColumn("processed_date", current_date());
// Write to Parquet files
StreamingQuery query = resultDF
.writeStream()
.outputMode("append")
.format("parquet")
.option("path", "data/streaming/output/")
.option("checkpointLocation", "data/streaming/checkpoint/")
.partitionBy("processed_date")
.start();
query.awaitTermination();
}
}
3. Kafka Sink
public class KafkaSinkExample {
public static void main(String[] args) throws StreamingQueryException {
SparkSession spark = SparkSession.builder()
.appName("KafkaSinkExample")
.master("local[2]")
.getOrCreate();
// Read from socket
Dataset<Row> inputDF = spark
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load();
// Process data
Dataset<Row> processedDF = inputDF
.select(
lit("key").as("key"),
to_json(struct(
col("value").as("message"),
current_timestamp().as("processed_at")
)).as("value")
);
// Write to Kafka
StreamingQuery query = processedDF
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "processed-events")
.option("checkpointLocation", "data/streaming/checkpoint/kafka/")
.start();
query.awaitTermination();
}
}
4. ForeachBatch Sink (Custom Processing)
public class ForeachBatchExample {
public static void main(String[] args) throws StreamingQueryException {
SparkSession spark = SparkSession.builder()
.appName("ForeachBatchExample")
.master("local[2]")
.getOrCreate();
Dataset<Row> streamDF = spark
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load();
Dataset<Row> processedDF = streamDF
.groupBy(col("value"))
.count();
// Custom processing using foreachBatch
StreamingQuery query = processedDF
.writeStream()
.outputMode("update")
.foreachBatch((Dataset<Row> batchDF, Long batchId) -> {
System.out.println("Processing batch: " + batchId);
// Custom logic for each batch
batchDF.persist();
// Write to multiple sinks
batchDF.write().mode("append").json("data/batch-output/" + batchId);
batchDF.show();
// Update external database
updateExternalDatabase(batchDF);
batchDF.unpersist();
})
.start();
query.awaitTermination();
}
private static void updateExternalDatabase(Dataset<Row> df) {
// Custom database update logic
df.foreach(row -> {
String key = row.getString(0);
Long count = row.getLong(1);
// Update database with key and count
System.out.println("Updating database: " + key + " -> " + count);
});
}
}
Advanced Streaming Patterns
1. Windowed Aggregations
public class WindowedAggregations {
public static void main(String[] args) throws StreamingQueryException {
SparkSession spark = SparkSession.builder()
.appName("WindowedAggregations")
.master("local[2]")
.getOrCreate();
// Read from Kafka with event timestamps
Dataset<Row> kafkaDF = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "metrics")
.load();
Dataset<Row> eventsDF = kafkaDF
.select(
from_json(col("value").cast("string"),
"device_id string, metric string, value double, event_time timestamp"
).as("data")
)
.select(
col("data.device_id"),
col("data.metric"),
col("data.value"),
col("data.event_time")
);
// Tumbling window (fixed, non-overlapping)
Dataset<Row> tumblingWindows = eventsDF
.withWatermark("event_time", "10 minutes")
.groupBy(
window(col("event_time"), "5 minutes"),
col("metric")
)
.agg(
avg("value").as("avg_value"),
max("value").as("max_value"),
min("value").as("min_value"),
count("*").as("event_count")
);
// Sliding window (overlapping)
Dataset<Row> slidingWindows = eventsDF
.withWatermark("event_time", "10 minutes")
.groupBy(
window(col("event_time"), "10 minutes", "5 minutes"),
col("metric")
)
.agg(
avg("value").as("avg_value"),
count("*").as("event_count")
);
// Session windows (group events with gaps < threshold)
Dataset<Row> sessionWindows = eventsDF
.withWatermark("event_time", "10 minutes")
.groupBy(
session_window(col("event_time"), "5 minutes"),
col("device_id"),
col("metric")
)
.agg(
avg("value").as("avg_value"),
count("*").as("event_count"),
min("event_time").as("session_start"),
max("event_time").as("session_end")
);
// Write results
StreamingQuery query = tumblingWindows
.writeStream()
.outputMode("update")
.format("console")
.option("truncate", "false")
.start();
query.awaitTermination();
}
}
2. Watermarking for Late Data
public class WatermarkExample {
public static void main(String[] args) throws StreamingQueryException {
SparkSession spark = SparkSession.builder()
.appName("WatermarkExample")
.master("local[2]")
.getOrCreate();
Dataset<Row> eventsDF = spark
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
.select(
split(col("value"), ",").getItem(0).as("user_id"),
split(col("value"), ",").getItem(1).as("action"),
to_timestamp(split(col("value"), ",").getItem(2)).as("event_time")
);
// Apply watermark of 10 minutes
Dataset<Row> watermarkedDF = eventsDF
.withWatermark("event_time", "10 minutes");
// Windowed aggregation with watermark
Dataset<Row> windowedCounts = watermarkedDF
.groupBy(
window(col("event_time"), "5 minutes"),
col("action")
)
.agg(count("*").as("count"));
// Join with static data (demonstrating watermark usage)
Dataset<Row> userProfiles = spark.read().json("data/user_profiles.json");
Dataset<Row> joinedDF = watermarkedDF
.join(userProfiles, "user_id");
StreamingQuery query = windowedCounts
.writeStream()
.outputMode("update")
.format("console")
.start();
query.awaitTermination();
}
}
3. Stream-Stream Joins
public class StreamStreamJoin {
public static void main(String[] args) throws StreamingQueryException {
SparkSession spark = SparkSession.builder()
.appName("StreamStreamJoin")
.master("local[2]")
.getOrCreate();
// First stream: user registrations
Dataset<Row> registrations = spark
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", 9998)
.load()
.select(
split(col("value"), ",").getItem(0).as("user_id"),
to_timestamp(split(col("value"), ",").getItem(1)).as("registration_time")
)
.withWatermark("registration_time", "1 minute");
// Second stream: user activities
Dataset<Row> activities = spark
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
.select(
split(col("value"), ",").getItem(0).as("user_id"),
split(col("value"), ",").getItem(1).as("activity"),
to_timestamp(split(col("value"), ",").getItem(2)).as("activity_time")
)
.withWatermark("activity_time", "2 minutes");
// Inner join with time range condition
Dataset<Row> joined = registrations
.join(
activities,
registrations.col("user_id").equalTo(activities.col("user_id"))
.and(activities.col("activity_time")
.gt(registrations.col("registration_time")))
.and(activities.col("activity_time")
.lt(registrations.col("registration_time").plus(expr("INTERVAL 1 HOUR")))),
"inner"
)
.select(
registrations.col("user_id"),
registrations.col("registration_time"),
activities.col("activity"),
activities.col("activity_time")
);
StreamingQuery query = joined
.writeStream()
.outputMode("append")
.format("console")
.start();
query.awaitTermination();
}
}
4. Stateful Processing with mapGroupsWithState
import org.apache.spark.sql.streaming.GroupState;
import org.apache.spark.sql.streaming.GroupStateTimeout;
import java.util.Optional;
public class StatefulProcessing {
public static void main(String[] args) throws StreamingQueryException {
SparkSession spark = SparkSession.builder()
.appName("StatefulProcessing")
.master("local[2]")
.getOrCreate();
Dataset<Row> eventsDF = spark
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
.select(
split(col("value"), ",").getItem(0).as("session_id"),
split(col("value"), ",").getItem(1).as("event_type"),
to_timestamp(split(col("value"), ",").getItem(2)).as("event_time")
)
.withWatermark("event_time", "10 minutes");
// Stateful processing for session tracking
Dataset<Row> sessionUpdates = eventsDF
.groupByKey((Row row) -> row.getString(0), Encoders.STRING())
.mapGroupsWithState(
GroupStateTimeout.EventTimeTimeout(),
new SessionTrackingFunction(),
Encoders.bean(SessionInfo.class),
Encoders.bean(SessionUpdate.class)
);
StreamingQuery query = sessionUpdates
.writeStream()
.outputMode("update")
.format("console")
.start();
query.awaitTermination();
}
// User session state
public static class SessionInfo implements Serializable {
private String sessionId;
private long startTime;
private long endTime;
private int eventCount;
private List<String> eventTypes;
// Constructors, getters, setters
public SessionInfo() {}
public SessionInfo(String sessionId, long startTime) {
this.sessionId = sessionId;
this.startTime = startTime;
this.endTime = startTime;
this.eventCount = 1;
this.eventTypes = new ArrayList<>();
}
public void update(long timestamp, String eventType) {
this.endTime = Math.max(this.endTime, timestamp);
this.eventCount++;
this.eventTypes.add(eventType);
}
public long duration() {
return endTime - startTime;
}
// Getters and setters...
}
// Session update output
public static class SessionUpdate implements Serializable {
private String sessionId;
private long duration;
private int eventCount;
private String status;
// Constructors, getters, setters...
public SessionUpdate() {}
public SessionUpdate(String sessionId, long duration, int eventCount, String status) {
this.sessionId = sessionId;
this.duration = duration;
this.eventCount = eventCount;
this.status = status;
}
}
// Stateful function
static class SessionTrackingFunction
extends MapGroupsWithStateFunction<String, Row, SessionInfo, SessionUpdate> {
@Override
public SessionUpdate call(String key, Iterator<Row> values, GroupState<SessionInfo> state)
throws Exception {
SessionInfo currentState = state.exists() ? state.get() :
new SessionInfo(key, System.currentTimeMillis());
while (values.hasNext()) {
Row row = values.next();
String eventType = row.getString(1);
long eventTime = row.getTimestamp(2).getTime();
currentState.update(eventTime, eventType);
}
// Check if session should timeout (1 hour of inactivity)
if (state.hasTimedOut()) {
SessionUpdate update = new SessionUpdate(
key, currentState.duration(), currentState.eventCount(), "expired"
);
state.remove();
return update;
} else {
state.update(currentState);
// Set timeout for 1 hour of inactivity
state.setTimeoutDuration("1 hour");
return new SessionUpdate(
key, currentState.duration(), currentState.eventCount(), "active"
);
}
}
}
}
Error Handling and Monitoring
1. Query Management and Monitoring
public class QueryManagement {
public static void main(String[] args) throws StreamingQueryException {
SparkSession spark = SparkSession.builder()
.appName("QueryManagement")
.master("local[2]")
.getOrCreate();
Dataset<Row> streamDF = spark
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load();
Dataset<Row> resultDF = streamDF
.groupBy(col("value"))
.count();
StreamingQuery query = resultDF
.writeStream()
.outputMode("complete")
.format("console")
.option("checkpointLocation", "data/checkpoint/")
.start();
// Monitor the query
new Thread(() -> {
while (query.isActive()) {
try {
Thread.sleep(5000); // Check every 5 seconds
// Get query status
System.out.println("Query Name: " + query.name());
System.out.println("Query ID: " + query.id());
System.out.println("Run ID: " + query.runId());
System.out.println("Status: " + query.status());
System.out.println("Recent Progress: " + query.lastProgress());
// Check for errors
if (query.exception().isPresent()) {
System.err.println("Query exception: " + query.exception().get());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}).start();
// Add shutdown hook for graceful termination
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Shutting down streaming query...");
query.stop();
}));
query.awaitTermination();
}
}
2. Fault Tolerance with Checkpointing
public class CheckpointExample {
public static void main(String[] args) throws StreamingQueryException {
SparkSession spark = SparkSession.builder()
.appName("CheckpointExample")
.master("local[2]")
.getOrCreate();
Dataset<Row> streamDF = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "important-events")
.option("startingOffsets", "latest")
.load();
Dataset<Row> processedDF = processStream(streamDF);
// With checkpointing for fault tolerance
StreamingQuery query = processedDF
.writeStream()
.outputMode("update")
.format("console")
.option("checkpointLocation", "data/checkpoint/fault_tolerant/")
// Additional checkpoint options
.option("maxOffsetsPerTrigger", "1000") // Control processing rate
.start();
query.awaitTermination();
}
private static Dataset<Row> processStream(Dataset<Row> df) {
return df
.select(
from_json(col("value").cast("string"),
"event_id string, user_id string, event_type string, " +
"timestamp timestamp, data string"
).as("event")
)
.select(
col("event.event_id"),
col("event.user_id"),
col("event.event_type"),
col("event.timestamp"),
col("event.data")
)
.withWatermark("timestamp", "5 minutes")
.groupBy(
window(col("timestamp"), "10 minutes", "5 minutes"),
col("event_type")
)
.agg(
count("*").as("event_count"),
countDistinct("user_id").as("unique_users")
);
}
}
Performance Tuning
1. Optimization Techniques
public class PerformanceTuning {
public static void main(String[] args) throws StreamingQueryException {
SparkSession spark = SparkSession.builder()
.appName("PerformanceTuning")
.master("local[4]") // Use more cores
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.config("spark.sql.adaptive.skew.enabled", "true")
.config("spark.sql.streaming.metricsEnabled", "true")
.config("spark.sql.shuffle.partitions", "8") // Adjust based on data size
.config("spark.streaming.backpressure.enabled", "true") // Enable backpressure
.config("spark.streaming.kafka.maxRatePerPartition", "1000") // Control ingestion rate
.getOrCreate();
Dataset<Row> kafkaDF = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "high-volume-events")
.option("maxOffsetsPerTrigger", "10000") // Control batch size
.load();
// Efficient processing with partitioning
Dataset<Row> processedDF = kafkaDF
.repartition(8) // Explicit repartitioning
.select(
from_json(col("value").cast("string"),
"user_id string, event_time timestamp, data string"
).as("event")
)
.select(
col("event.user_id"),
col("event.event_time"),
col("event.data")
)
.withWatermark("event_time", "1 minute");
// Use efficient aggregations
Dataset<Row> aggregatedDF = processedDF
.groupBy(
window(col("event_time"), "5 minutes"),
col("user_id")
)
.agg(
count("*").as("event_count"),
first("event_time").as("first_event"),
last("event_time").as("last_event")
)
.select(
col("window.start").as("window_start"),
col("user_id"),
col("event_count"),
col("first_event"),
col("last_event")
);
StreamingQuery query = aggregatedDF
.writeStream()
.outputMode("update")
.format("console")
.option("checkpointLocation", "data/checkpoint/performance/")
.trigger(Trigger.ProcessingTime("2 seconds")) // Adjust trigger interval
.start();
query.awaitTermination();
}
}
Real-World Use Cases
1. Real-time Analytics Dashboard
public class RealTimeAnalytics {
public static void main(String[] args) throws StreamingQueryException {
SparkSession spark = SparkSession.builder()
.appName("RealTimeAnalytics")
.master("local[2]")
.getOrCreate();
// Read user interaction events
Dataset<Row> eventsDF = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "user-interactions")
.load()
.select(
from_json(col("value").cast("string"),
"user_id string, page_id string, action string, " +
"timestamp timestamp, session_id string, duration int"
).as("event")
)
.select(
col("event.*")
)
.withWatermark("timestamp", "5 minutes");
// Real-time metrics
Dataset<Row> realTimeMetrics = eventsDF
.groupBy(
window(col("timestamp"), "1 minute", "30 seconds")
)
.agg(
count("*").as("total_events"),
countDistinct("user_id").as("active_users"),
countDistinct("session_id").as("active_sessions"),
sum(when(col("action").equalTo("click"), 1).otherwise(0)).as("clicks"),
sum(when(col("action").equalTo("view"), 1).otherwise(0)).as("views"),
avg("duration").as("avg_session_duration")
)
.select(
col("window.start").as("timestamp"),
col("total_events"),
col("active_users"),
col("active_sessions"),
col("clicks"),
col("views"),
col("avg_session_duration")
);
// Write to multiple sinks for different consumers
StreamingQuery consoleQuery = realTimeMetrics
.writeStream()
.outputMode("update")
.format("console")
.option("truncate", "false")
.start();
StreamingQuery kafkaQuery = realTimeMetrics
.select(
lit("metrics").as("key"),
to_json(struct(col("*"))).as("value")
)
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "real-time-metrics")
.option("checkpointLocation", "data/checkpoint/analytics/")
.start();
// Wait for any query to terminate
spark.streams().awaitAnyTermination();
}
}
Best Practices
- Checkpointing: Always use checkpointing for fault tolerance
- Watermarking: Use watermarks for handling late data in aggregations
- Partitioning: Properly partition data for parallel processing
- Monitoring: Implement comprehensive monitoring and alerting
- Backpressure: Enable backpressure for rate limiting
- Resource Management: Tune Spark configurations based on workload
- Testing: Test streaming applications with realistic data volumes
- Graceful Shutdown: Implement proper shutdown hooks
Spark Structured Streaming provides a powerful, scalable platform for real-time data processing with exactly-once semantics and built-in fault tolerance.