Introduction
KSQL is the streaming SQL engine for Apache Kafka® that enables real-time data processing using familiar SQL-like syntax. When combined with Java applications, KSQL provides a powerful platform for building real-time stream processing applications, analytics pipelines, and event-driven microservices.
This comprehensive guide explores KSQL integration with Java, covering everything from basic stream processing to advanced real-time analytics.
Architecture Overview
1. KSQL Components
- KSQL Server: Processes SQL queries on Kafka streams
- KSQL CLI: Command-line interface for KSQL
- Streams: Unbounded sequences of structured data
- Tables: mutable, partitioned collections that represent the latest value for each key
- Connectors: Integration with external systems
2. Java Integration Patterns
- KSQL REST API Client: Direct HTTP communication
- Kafka Streams: Native Java stream processing
- Spring Cloud Stream: Framework abstraction
- Custom Clients: Low-level integration
Project Setup and Dependencies
1. Maven Dependencies
<dependencies> <!-- KSQL DB Client --> <dependency> <groupId>io.confluent.ksql</groupId> <artifactId>ksqldb-api-client</artifactId> <version>7.5.1</version> </dependency> <!-- Kafka Streams --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>3.5.1</version> </dependency> <!-- Spring Kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- Spring Boot --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- JSON Processing --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> <!-- Testing --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>kafka</artifactId> <scope>test</scope> </dependency> </dependencies> <repositories> <repository> <id>confluent</id> <url>https://packages.confluent.io/maven/</url> </repository> </repositories>
2. Application Configuration
# application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
properties:
schema.registry.url: http://localhost:8081
ksql:
server: http://localhost:8088
basic-auth:
username: ${KSQL_USERNAME:}
password: ${KSQL_PASSWORD:}
logging:
level:
io.confluent.ksql: INFO
org.apache.kafka: INFO
KSQL Client Implementation
1. KSQL REST Client Service
@Component
@Slf4j
public class KsqlDbClientService {
private final KsqlClient ksqlClient;
private final ObjectMapper objectMapper;
public KsqlDbClientService(@Value("${ksql.server}") String ksqlServerUrl,
ObjectMapper objectMapper) {
this.ksqlClient = KsqlClient.create(ksqlServerUrl);
this.objectMapper = objectMapper;
}
/**
* Execute KSQL query and return results as list
*/
public <T> List<T> executeQuery(String ksql, Class<T> resultType) {
List<T> results = new ArrayList<>();
try {
StreamedQueryResult streamedResult = ksqlClient.streamQuery(ksql);
while (streamedResult.hasNext()) {
Row row = streamedResult.next();
T result = convertRowToObject(row, resultType);
if (result != null) {
results.add(result);
}
}
} catch (Exception e) {
log.error("Error executing KSQL query: {}", ksql, e);
throw new KsqlExecutionException("Failed to execute KSQL query", e);
}
return results;
}
/**
* Execute KSQL statement (CREATE STREAM, CREATE TABLE, etc.)
*/
public void executeStatement(String ksqlStatement) {
try {
ExecuteStatementResult result = ksqlClient.executeStatement(ksqlStatement);
log.info("KSQL statement executed successfully: {}", ksqlStatement);
} catch (Exception e) {
log.error("Error executing KSQL statement: {}", ksqlStatement, e);
throw new KsqlExecutionException("Failed to execute KSQL statement", e);
}
}
/**
* Execute KSQL with parameters
*/
public <T> List<T> executeParameterizedQuery(String ksql,
Map<String, Object> parameters,
Class<T> resultType) {
List<T> results = new ArrayList<>();
try {
StreamedQueryResult streamedResult = ksqlClient.streamQuery(ksql, parameters);
while (streamedResult.hasNext()) {
Row row = streamedResult.next();
T result = convertRowToObject(row, resultType);
if (result != null) {
results.add(result);
}
}
} catch (Exception e) {
log.error("Error executing parameterized KSQL query: {}", ksql, e);
throw new KsqlExecutionException("Failed to execute parameterized KSQL query", e);
}
return results;
}
/**
* Insert values into a stream
*/
public void insertIntoStream(String streamName, Map<String, Object> values) {
try {
ksqlClient.insertInto(streamName, values);
log.debug("Inserted values into stream: {} - {}", streamName, values);
} catch (Exception e) {
log.error("Error inserting into stream: {}", streamName, e);
throw new KsqlExecutionException("Failed to insert into stream", e);
}
}
/**
* Get stream/table description
*/
public DescribeResult describeSource(String sourceName) {
try {
return ksqlClient.describeSource(sourceName);
} catch (Exception e) {
log.error("Error describing source: {}", sourceName, e);
throw new KsqlExecutionException("Failed to describe source", e);
}
}
/**
* List available streams
*/
public List<SourceDescription> listStreams() {
try {
return ksqlClient.listStreams();
} catch (Exception e) {
log.error("Error listing streams", e);
throw new KsqlExecutionException("Failed to list streams", e);
}
}
/**
* List available tables
*/
public List<SourceDescription> listTables() {
try {
return ksqlClient.listTables();
} catch (Exception e) {
log.error("Error listing tables", e);
throw new KsqlExecutionException("Failed to list tables", e);
}
}
/**
* Drop stream
*/
public void dropStream(String streamName) {
try {
ksqlClient.executeStatement("DROP STREAM " + streamName + ";");
log.info("Dropped stream: {}", streamName);
} catch (Exception e) {
log.error("Error dropping stream: {}", streamName, e);
throw new KsqlExecutionException("Failed to drop stream", e);
}
}
/**
* Drop table
*/
public void dropTable(String tableName) {
try {
ksqlClient.executeStatement("DROP TABLE " + tableName + ";");
log.info("Dropped table: {}", tableName);
} catch (Exception e) {
log.error("Error dropping table: {}", tableName, e);
throw new KsqlExecutionException("Failed to drop table", e);
}
}
@SuppressWarnings("unchecked")
private <T> T convertRowToObject(Row row, Class<T> resultType) {
try {
if (row.getRow().isPresent()) {
KsqlObject ksqlObject = row.getRow().get();
Map<String, Object> rowMap = new HashMap<>();
for (String column : ksqlObject.keySet()) {
rowMap.put(column, ksqlObject.get(column));
}
return objectMapper.convertValue(rowMap, resultType);
}
} catch (Exception e) {
log.warn("Failed to convert row to object: {}", row, e);
}
return null;
}
}
2. Domain Models for Stream Processing
// Event Models
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class UserEvent {
private String userId;
private String eventType;
private String pageId;
private Long timestamp;
private Map<String, Object> properties;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class PurchaseEvent {
private String orderId;
private String userId;
private String productId;
private String productName;
private Double price;
private Integer quantity;
private Long timestamp;
private String currency;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class ProductViewEvent {
private String userId;
private String productId;
private String productName;
private String category;
private Long viewDurationMs;
private Long timestamp;
}
// KSQL Result Models
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class UserSession {
private String userId;
private Long sessionStart;
private Long sessionEnd;
private Long durationMs;
private Integer pageViewCount;
private String mostViewedPage;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class RealTimeMetrics {
private String metricName;
private Double value;
private Long windowStart;
private Long windowEnd;
private String dimension;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class TopProduct {
private String productId;
private String productName;
private Long viewCount;
private Double totalRevenue;
private Long timestamp;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class FraudAlert {
private String userId;
private String alertType;
private String description;
private Double riskScore;
private Long timestamp;
private String transactionId;
}
Stream Processing Service Implementations
1. Real-time Analytics Service
@Service
@Slf4j
public class RealTimeAnalyticsService {
private final KsqlDbClientService ksqlClient;
private final KafkaTemplate<String, Object> kafkaTemplate;
public RealTimeAnalyticsService(KsqlDbClientService ksqlClient,
KafkaTemplate<String, Object> kafkaTemplate) {
this.ksqlClient = ksqlClient;
this.kafkaTemplate = kafkaTemplate;
}
/**
* Initialize KSQL streams and tables
*/
@PostConstruct
public void initializeKsqlResources() {
createUserEventsStream();
createPurchaseEventsStream();
createProductViewsStream();
createRealTimeAggregations();
createFraudDetectionQueries();
}
private void createUserEventsStream() {
String createStreamSql = """
CREATE STREAM user_events_stream (
user_id VARCHAR,
event_type VARCHAR,
page_id VARCHAR,
timestamp BIGINT,
properties MAP<VARCHAR, VARCHAR>
) WITH (
KAFKA_TOPIC = 'user-events',
VALUE_FORMAT = 'JSON',
PARTITIONS = 6
);
""";
ksqlClient.executeStatement(createStreamSql);
log.info("Created user_events_stream");
}
private void createPurchaseEventsStream() {
String createStreamSql = """
CREATE STREAM purchase_events_stream (
order_id VARCHAR,
user_id VARCHAR,
product_id VARCHAR,
product_name VARCHAR,
price DOUBLE,
quantity INT,
timestamp BIGINT,
currency VARCHAR
) WITH (
KAFKA_TOPIC = 'purchase-events',
VALUE_FORMAT = 'JSON',
PARTITIONS = 6
);
""";
ksqlClient.executeStatement(createStreamSql);
log.info("Created purchase_events_stream");
}
private void createProductViewsStream() {
String createStreamSql = """
CREATE STREAM product_views_stream (
user_id VARCHAR,
product_id VARCHAR,
product_name VARCHAR,
category VARCHAR,
view_duration_ms BIGINT,
timestamp BIGINT
) WITH (
KAFKA_TOPIC = 'product-views',
VALUE_FORMAT = 'JSON',
PARTITIONS = 6
);
""";
ksqlClient.executeStatement(createStreamSql);
log.info("Created product_views_stream");
}
private void createRealTimeAggregations() {
// Real-time revenue by product
String revenueByProductSql = """
CREATE TABLE revenue_by_product AS
SELECT
product_id,
product_name,
SUM(price * quantity) AS total_revenue,
COUNT(*) AS order_count
FROM purchase_events_stream
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY product_id, product_name;
""";
ksqlClient.executeStatement(revenueByProductSql);
// User session aggregation
String userSessionsSql = """
CREATE TABLE user_sessions AS
SELECT
user_id,
WINDOWSTART AS session_start,
WINDOWEND AS session_end,
COUNT(*) AS page_view_count,
AS_VALUE(LATEST_BY_OFFSET(page_id)) AS last_page_viewed
FROM user_events_stream
WINDOW SESSION (30 MINUTES)
GROUP BY user_id;
""";
ksqlClient.executeStatement(userSessionsSql);
// Top products by views
String topProductsSql = """
CREATE TABLE top_products AS
SELECT
product_id,
product_name,
COUNT(*) AS view_count
FROM product_views_stream
WINDOW TUMBLING (SIZE 10 MINUTES)
GROUP BY product_id, product_name
EMIT CHANGES;
""";
ksqlClient.executeStatement(topProductsSql);
log.info("Created real-time aggregations");
}
private void createFraudDetectionQueries() {
// High-frequency purchase detection
String frequentPurchasesSql = """
CREATE STREAM potential_fraud_alerts AS
SELECT
user_id,
'HIGH_FREQUENCY_PURCHASES' AS alert_type,
'User made more than 5 purchases in 10 minutes' AS description,
COUNT(*) AS purchase_count,
0.8 AS risk_score,
ROWTIME AS timestamp
FROM purchase_events_stream
WINDOW TUMBLING (SIZE 10 MINUTES)
GROUP BY user_id
HAVING COUNT(*) > 5;
""";
ksqlClient.executeStatement(frequentPurchasesSql);
// Large amount purchases
String largeAmountSql = """
CREATE STREAM large_amount_alerts AS
SELECT
user_id,
'LARGE_AMOUNT_PURCHASE' AS alert_type,
'Single purchase exceeds $1000' AS description,
(price * quantity) AS purchase_amount,
0.9 AS risk_score,
ROWTIME AS timestamp,
order_id
FROM purchase_events_stream
WHERE (price * quantity) > 1000;
""";
ksqlClient.executeStatement(largeAmountSql);
log.info("Created fraud detection queries");
}
/**
* Get real-time revenue metrics
*/
public List<RealTimeMetrics> getRevenueMetrics(String timeWindow) {
String query = """
SELECT
'REVENUE' AS metric_name,
SUM(total_price) AS value,
WINDOWSTART AS window_start,
WINDOWEND AS window_end
FROM (
SELECT
product_id,
price * quantity AS total_price
FROM purchase_events_stream
)
WINDOW TUMBLING (SIZE %s)
GROUP BY 'REVENUE';
""".formatted(timeWindow);
return ksqlClient.executeQuery(query, RealTimeMetrics.class);
}
/**
* Get top products by views
*/
public List<TopProduct> getTopProducts(int limit) {
String query = """
SELECT
product_id,
product_name,
view_count,
ROWTIME AS timestamp
FROM top_products
ORDER BY view_count DESC
LIMIT %d;
""".formatted(limit);
return ksqlClient.executeQuery(query, TopProduct.class);
}
/**
* Get user session analytics
*/
public List<UserSession> getUserSessions(String userId) {
String query = """
SELECT
user_id,
session_start,
session_end,
(session_end - session_start) AS duration_ms,
page_view_count,
last_page_viewed AS most_viewed_page
FROM user_sessions
WHERE user_id = '%s';
""".formatted(userId);
return ksqlClient.executeQuery(query, UserSession.class);
}
/**
* Get real-time fraud alerts
*/
public List<FraudAlert> getFraudAlerts() {
String query = """
SELECT
user_id,
alert_type,
description,
risk_score,
timestamp,
transaction_id
FROM potential_fraud_alerts
UNION
SELECT
user_id,
alert_type,
description,
risk_score,
timestamp,
order_id AS transaction_id
FROM large_amount_alerts
ORDER BY timestamp DESC
LIMIT 100;
""";
return ksqlClient.executeQuery(query, FraudAlert.class);
}
/**
* Calculate conversion rates
*/
public Double calculateConversionRate(String productId) {
String query = """
SELECT
(purchase_count / view_count) AS conversion_rate
FROM (
SELECT
COUNT(*) AS purchase_count
FROM purchase_events_stream
WHERE product_id = '%s'
WINDOW TUMBLING (SIZE 1 HOUR)
) purchases
JOIN (
SELECT
COUNT(*) AS view_count
FROM product_views_stream
WHERE product_id = '%s'
WINDOW TUMBLING (SIZE 1 HOUR)
) views ON 1=1;
""".formatted(productId, productId);
List<Map<String, Object>> results = ksqlClient.executeQuery(query, Map.class);
if (!results.isEmpty()) {
Object conversionRate = results.get(0).get("CONVERSION_RATE");
return conversionRate != null ? ((Number) conversionRate).doubleValue() : 0.0;
}
return 0.0;
}
}
2. Event Processing Service
@Service
@Slf4j
public class EventProcessingService {
private final KsqlDbClientService ksqlClient;
private final KafkaTemplate<String, Object> kafkaTemplate;
public EventProcessingService(KsqlDbClientService ksqlClient,
KafkaTemplate<String, Object> kafkaTemplate) {
this.ksqlClient = ksqlClient;
this.kafkaTemplate = kafkaTemplate;
}
/**
* Send user event to KSQL stream
*/
public void sendUserEvent(UserEvent userEvent) {
Map<String, Object> eventData = new HashMap<>();
eventData.put("user_id", userEvent.getUserId());
eventData.put("event_type", userEvent.getEventType());
eventData.put("page_id", userEvent.getPageId());
eventData.put("timestamp", userEvent.getTimestamp());
eventData.put("properties", userEvent.getProperties());
ksqlClient.insertIntoStream("user_events_stream", eventData);
// Also send to Kafka topic for other consumers
kafkaTemplate.send("user-events", userEvent.getUserId(), userEvent);
log.debug("Sent user event: {}", userEvent.getUserId());
}
/**
* Send purchase event to KSQL stream
*/
public void sendPurchaseEvent(PurchaseEvent purchaseEvent) {
Map<String, Object> eventData = new HashMap<>();
eventData.put("order_id", purchaseEvent.getOrderId());
eventData.put("user_id", purchaseEvent.getUserId());
eventData.put("product_id", purchaseEvent.getProductId());
eventData.put("product_name", purchaseEvent.getProductName());
eventData.put("price", purchaseEvent.getPrice());
eventData.put("quantity", purchaseEvent.getQuantity());
eventData.put("timestamp", purchaseEvent.getTimestamp());
eventData.put("currency", purchaseEvent.getCurrency());
ksqlClient.insertIntoStream("purchase_events_stream", eventData);
kafkaTemplate.send("purchase-events", purchaseEvent.getUserId(), purchaseEvent);
log.debug("Sent purchase event: {}", purchaseEvent.getOrderId());
}
/**
* Send product view event to KSQL stream
*/
public void sendProductViewEvent(ProductViewEvent productViewEvent) {
Map<String, Object> eventData = new HashMap<>();
eventData.put("user_id", productViewEvent.getUserId());
eventData.put("product_id", productViewEvent.getProductId());
eventData.put("product_name", productViewEvent.getProductName());
eventData.put("category", productViewEvent.getCategory());
eventData.put("view_duration_ms", productViewEvent.getViewDurationMs());
eventData.put("timestamp", productViewEvent.getTimestamp());
ksqlClient.insertIntoStream("product_views_stream", eventData);
kafkaTemplate.send("product-views", productViewEvent.getUserId(), productViewEvent);
log.debug("Sent product view event: {}", productViewEvent.getProductId());
}
/**
* Process real-time user behavior
*/
public void processUserBehavior(String userId) {
String behaviorQuery = """
SELECT
ue.user_id,
COUNT(*) AS total_events,
COUNT_DISTINCT(ue.page_id) AS unique_pages,
MAX(ue.timestamp) AS last_activity,
COALESCE(COUNT(pe.order_id), 0) AS purchase_count
FROM user_events_stream ue
LEFT JOIN purchase_events_stream pe
WITHIN 24 HOURS
ON ue.user_id = pe.user_id
WHERE ue.user_id = '%s'
GROUP BY ue.user_id;
""".formatted(userId);
List<Map<String, Object>> results = ksqlClient.executeQuery(behaviorQuery, Map.class);
if (!results.isEmpty()) {
Map<String, Object> behavior = results.get(0);
log.info("User behavior analysis: {}", behavior);
// Trigger personalized recommendations or alerts
triggerPersonalization(userId, behavior);
}
}
/**
* Generate real-time recommendations
*/
public List<String> generateRecommendations(String userId) {
String recommendationQuery = """
SELECT
pv.product_id,
pv.product_name,
COUNT(*) AS view_count,
COALESCE(SUM(pe.quantity), 0) AS purchase_count
FROM product_views_stream pv
LEFT JOIN purchase_events_stream pe
WITHIN 7 DAYS
ON pv.product_id = pe.product_id
WHERE pv.user_id = '%s'
GROUP BY pv.product_id, pv.product_name
HAVING purchase_count = 0
ORDER BY view_count DESC
LIMIT 10;
""".formatted(userId);
List<Map<String, Object>> results = ksqlClient.executeQuery(recommendationQuery, Map.class);
return results.stream()
.map(result -> (String) result.get("PRODUCT_NAME"))
.collect(Collectors.toList());
}
private void triggerPersonalization(String userId, Map<String, Object> behavior) {
Long totalEvents = (Long) behavior.get("TOTAL_EVENTS");
Long purchaseCount = (Long) behavior.get("PURCHASE_COUNT");
if (totalEvents > 50 && purchaseCount == 0) {
// High engagement but no purchases - trigger special offer
log.info("Triggering special offer for user: {}", userId);
sendPersonalizedOffer(userId);
}
}
private void sendPersonalizedOffer(String userId) {
// Implementation for sending personalized offers
log.info("Sending personalized offer to user: {}", userId);
}
}
Advanced KSQL Patterns
1. Complex Event Processing
@Service
@Slf4j
public class ComplexEventProcessingService {
private final KsqlDbClientService ksqlClient;
public ComplexEventProcessingService(KsqlDbClientService ksqlClient) {
this.ksqlClient = ksqlClient;
}
/**
* Detect abandoned carts
*/
public void setupAbandonedCartDetection() {
String abandonedCartSql = """
CREATE STREAM abandoned_carts AS
SELECT
user_id,
'ABANDONED_CART' AS alert_type,
COLLECT_LIST(product_id) AS viewed_products,
MAX(timestamp) AS last_view_time,
ROWTIME AS alert_time
FROM product_views_stream pv
WHERE NOT EXISTS (
SELECT 1
FROM purchase_events_stream pe
WHERE pv.user_id = pe.user_id
AND pv.product_id = pe.product_id
AND pe.timestamp > pv.timestamp
)
WINDOW SESSION (30 MINUTES)
GROUP BY user_id
HAVING COUNT(*) >= 2;
""";
ksqlClient.executeStatement(abandonedCartSql);
log.info("Setup abandoned cart detection");
}
/**
* Real-time inventory management
*/
public void setupInventoryManagement() {
String inventorySql = """
CREATE TABLE product_inventory AS
SELECT
product_id,
product_name,
SUM(quantity) AS total_sold,
LATEST_BY_OFFSET(price) AS current_price
FROM purchase_events_stream
GROUP BY product_id, product_name;
""";
ksqlClient.executeStatement(inventorySql);
String lowStockAlertSql = """
CREATE STREAM low_stock_alerts AS
SELECT
pi.product_id,
pi.product_name,
pi.total_sold,
'LOW_STOCK_ALERT' AS alert_type,
ROWTIME AS alert_time
FROM product_inventory pi
WHERE pi.total_sold > 100; -- Threshold for low stock
""";
ksqlClient.executeStatement(lowStockAlertSql);
log.info("Setup inventory management");
}
/**
* Customer segmentation
*/
public void setupCustomerSegmentation() {
String customerSegmentsSql = """
CREATE TABLE customer_segments AS
SELECT
user_id,
CASE
WHEN total_spent > 1000 THEN 'VIP'
WHEN total_spent > 500 THEN 'PREMIUM'
WHEN total_spent > 100 THEN 'STANDARD'
ELSE 'NEW'
END AS segment,
total_spent,
order_count
FROM (
SELECT
user_id,
SUM(price * quantity) AS total_spent,
COUNT(*) AS order_count
FROM purchase_events_stream
GROUP BY user_id
);
""";
ksqlClient.executeStatement(customerSegmentsSql);
log.info("Setup customer segmentation");
}
/**
* Real-time A/B testing
*/
public void setupAbTesting() {
String abTestSql = """
CREATE STREAM ab_test_results AS
SELECT
properties['experiment_id'] AS experiment_id,
properties['variant'] AS variant,
event_type,
COUNT(*) AS event_count,
COUNT_DISTINCT(user_id) AS unique_users
FROM user_events_stream
WHERE event_type IN ('conversion', 'click', 'view')
AND properties['experiment_id'] IS NOT NULL
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY
properties['experiment_id'],
properties['variant'],
event_type;
""";
ksqlClient.executeStatement(abTestSql);
log.info("Setup A/B testing analytics");
}
/**
* Get abandoned cart alerts
*/
public List<Map<String, Object>> getAbandonedCarts() {
String query = """
SELECT
user_id,
viewed_products,
last_view_time,
alert_time
FROM abandoned_carts
ORDER BY alert_time DESC
LIMIT 50;
""";
return ksqlClient.executeQuery(query, Map.class);
}
/**
* Get customer segments
*/
public List<Map<String, Object>> getCustomerSegments() {
String query = """
SELECT
user_id,
segment,
total_spent,
order_count
FROM customer_segments
ORDER BY total_spent DESC
LIMIT 100;
""";
return ksqlClient.executeQuery(query, Map.class);
}
}
2. Windowed Aggregations Service
@Service
@Slf4j
public class WindowedAggregationService {
private final KsqlDbClientService ksqlClient;
public WindowedAggregationService(KsqlDbClientService ksqlClient) {
this.ksqlClient = ksqlClient;
}
/**
* Setup various windowed aggregations
*/
public void setupWindowedAggregations() {
setupTumblingWindowAggregations();
setupHoppingWindowAggregations();
setupSessionWindowAggregations();
}
private void setupTumblingWindowAggregations() {
// Tumbling window - fixed, non-overlapping windows
String tumblingRevenueSql = """
CREATE TABLE tumbling_revenue_1min AS
SELECT
'REVENUE_1MIN' AS metric_type,
SUM(price * quantity) AS total_revenue,
COUNT(*) AS order_count,
WINDOWSTART AS window_start,
WINDOWEND AS window_end
FROM purchase_events_stream
WINDOW TUMBLING (SIZE 1 MINUTE)
GROUP BY 'REVENUE_1MIN';
""";
ksqlClient.executeStatement(tumblingRevenueSql);
String tumblingUserActivitySql = """
CREATE TABLE tumbling_user_activity_5min AS
SELECT
'USER_ACTIVITY_5MIN' AS metric_type,
COUNT_DISTINCT(user_id) AS active_users,
COUNT(*) AS total_events,
WINDOWSTART AS window_start,
WINDOWEND AS window_end
FROM user_events_stream
WINDOW TUMBLING (SIZE 5 MINUTES)
GROUP BY 'USER_ACTIVITY_5MIN';
""";
ksqlClient.executeStatement(tumblingUserActivitySql);
}
private void setupHoppingWindowAggregations() {
// Hopping window - overlapping windows
String hoppingRevenueSql = """
CREATE TABLE hopping_revenue_5min_1min AS
SELECT
'HOPPING_REVENUE' AS metric_type,
SUM(price * quantity) AS total_revenue,
WINDOWSTART AS window_start,
WINDOWEND AS window_end
FROM purchase_events_stream
WINDOW HOPPING (SIZE 5 MINUTES, ADVANCE BY 1 MINUTE)
GROUP BY 'HOPPING_REVENUE';
""";
ksqlClient.executeStatement(hoppingRevenueSql);
}
private void setupSessionWindowAggregations() {
// Session window - periods of activity separated by gaps of inactivity
String userSessionsSql = """
CREATE TABLE user_session_analytics AS
SELECT
user_id,
COUNT(*) AS event_count,
WINDOWSTART AS session_start,
WINDOWEND AS session_end,
(WINDOWEND - WINDOWSTART) AS session_duration_ms
FROM user_events_stream
WINDOW SESSION (30 MINUTES)
GROUP BY user_id;
""";
ksqlClient.executeStatement(userSessionsSql);
}
/**
* Get tumbling window metrics
*/
public List<Map<String, Object>> getTumblingWindowMetrics(String metricType) {
String query = """
SELECT
metric_type,
total_revenue,
order_count,
window_start,
window_end
FROM tumbling_revenue_1min
WHERE metric_type = '%s'
ORDER BY window_end DESC
LIMIT 10;
""".formatted(metricType);
return ksqlClient.executeQuery(query, Map.class);
}
/**
* Get hopping window metrics
*/
public List<Map<String, Object>> getHoppingWindowMetrics() {
String query = """
SELECT
metric_type,
total_revenue,
window_start,
window_end
FROM hopping_revenue_5min_1min
ORDER BY window_end DESC
LIMIT 20;
""";
return ksqlClient.executeQuery(query, Map.class);
}
/**
* Get user session analytics
*/
public List<Map<String, Object>> getUserSessions(String userId) {
String query = """
SELECT
user_id,
event_count,
session_start,
session_end,
session_duration_ms
FROM user_session_analytics
WHERE user_id = '%s'
ORDER BY session_end DESC
LIMIT 10;
""".formatted(userId);
return ksqlClient.executeQuery(query, Map.class);
}
}
REST API Controllers
1. Analytics Controller
@RestController
@RequestMapping("/api/analytics")
@Slf4j
public class AnalyticsController {
private final RealTimeAnalyticsService analyticsService;
private final EventProcessingService eventProcessingService;
private final ComplexEventProcessingService complexEventService;
public AnalyticsController(RealTimeAnalyticsService analyticsService,
EventProcessingService eventProcessingService,
ComplexEventProcessingService complexEventService) {
this.analyticsService = analyticsService;
this.eventProcessingService = eventProcessingService;
this.complexEventService = complexEventService;
}
@GetMapping("/revenue/metrics")
public ResponseEntity<List<RealTimeMetrics>> getRevenueMetrics(
@RequestParam(defaultValue = "1 HOUR") String window) {
try {
List<RealTimeMetrics> metrics = analyticsService.getRevenueMetrics(window);
return ResponseEntity.ok(metrics);
} catch (Exception e) {
log.error("Error fetching revenue metrics", e);
return ResponseEntity.internalServerError().build();
}
}
@GetMapping("/products/top")
public ResponseEntity<List<TopProduct>> getTopProducts(
@RequestParam(defaultValue = "10") int limit) {
try {
List<TopProduct> topProducts = analyticsService.getTopProducts(limit);
return ResponseEntity.ok(topProducts);
} catch (Exception e) {
log.error("Error fetching top products", e);
return ResponseEntity.internalServerError().build();
}
}
@GetMapping("/fraud/alerts")
public ResponseEntity<List<FraudAlert>> getFraudAlerts() {
try {
List<FraudAlert> alerts = analyticsService.getFraudAlerts();
return ResponseEntity.ok(alerts);
} catch (Exception e) {
log.error("Error fetching fraud alerts", e);
return ResponseEntity.internalServerError().build();
}
}
@GetMapping("/users/{userId}/sessions")
public ResponseEntity<List<UserSession>> getUserSessions(@PathVariable String userId) {
try {
List<UserSession> sessions = analyticsService.getUserSessions(userId);
return ResponseEntity.ok(sessions);
} catch (Exception e) {
log.error("Error fetching user sessions for: {}", userId, e);
return ResponseEntity.internalServerError().build();
}
}
@GetMapping("/users/{userId}/recommendations")
public ResponseEntity<List<String>> getUserRecommendations(@PathVariable String userId) {
try {
List<String> recommendations = eventProcessingService.generateRecommendations(userId);
return ResponseEntity.ok(recommendations);
} catch (Exception e) {
log.error("Error generating recommendations for: {}", userId, e);
return ResponseEntity.internalServerError().build();
}
}
@GetMapping("/abandoned-carts")
public ResponseEntity<List<Map<String, Object>>> getAbandonedCarts() {
try {
List<Map<String, Object>> abandonedCarts = complexEventService.getAbandonedCarts();
return ResponseEntity.ok(abandonedCarts);
} catch (Exception e) {
log.error("Error fetching abandoned carts", e);
return ResponseEntity.internalServerError().build();
}
}
@GetMapping("/customer-segments")
public ResponseEntity<List<Map<String, Object>>> getCustomerSegments() {
try {
List<Map<String, Object>> segments = complexEventService.getCustomerSegments();
return ResponseEntity.ok(segments);
} catch (Exception e) {
log.error("Error fetching customer segments", e);
return ResponseEntity.internalServerError().build();
}
}
@PostMapping("/events/user")
public ResponseEntity<Void> sendUserEvent(@RequestBody UserEvent userEvent) {
try {
eventProcessingService.sendUserEvent(userEvent);
return ResponseEntity.accepted().build();
} catch (Exception e) {
log.error("Error sending user event", e);
return ResponseEntity.internalServerError().build();
}
}
@PostMapping("/events/purchase")
public ResponseEntity<Void> sendPurchaseEvent(@RequestBody PurchaseEvent purchaseEvent) {
try {
eventProcessingService.sendPurchaseEvent(purchaseEvent);
return ResponseEntity.accepted().build();
} catch (Exception e) {
log.error("Error sending purchase event", e);
return ResponseEntity.internalServerError().build();
}
}
@PostMapping("/events/product-view")
public ResponseEntity<Void> sendProductViewEvent(@RequestBody ProductViewEvent productViewEvent) {
try {
eventProcessingService.sendProductViewEvent(productViewEvent);
return ResponseEntity.accepted().build();
} catch (Exception e) {
log.error("Error sending product view event", e);
return ResponseEntity.internalServerError().build();
}
}
}
Testing KSQL Integration
1. Integration Tests
@SpringBootTest
@Testcontainers
@ActiveProfiles("test")
class KsqlIntegrationTest {
@Container
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.5.1")
);
@Autowired
private KsqlDbClientService ksqlClient;
@Autowired
private RealTimeAnalyticsService analyticsService;
@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
registry.add("ksql.server", () -> "http://localhost:8088");
}
@Test
void testKsqlClientConnection() {
assertDoesNotThrow(() -> {
List<SourceDescription> streams = ksqlClient.listStreams();
assertNotNull(streams);
});
}
@Test
void testStreamCreation() {
String testStreamSql = """
CREATE STREAM test_stream (
id VARCHAR,
value INT,
timestamp BIGINT
) WITH (
KAFKA_TOPIC = 'test-topic',
VALUE_FORMAT = 'JSON',
PARTITIONS = 1
);
""";
assertDoesNotThrow(() -> ksqlClient.executeStatement(testStreamSql));
// Verify stream was created
List<SourceDescription> streams = ksqlClient.listStreams();
boolean streamExists = streams.stream()
.anyMatch(stream -> stream.getName().equals("TEST_STREAM"));
assertTrue(streamExists, "Test stream should be created");
// Cleanup
ksqlClient.dropStream("TEST_STREAM");
}
@Test
void testEventProcessing() {
PurchaseEvent purchaseEvent = PurchaseEvent.builder()
.orderId("test-order-123")
.userId("test-user-456")
.productId("test-product-789")
.productName("Test Product")
.price(99.99)
.quantity(2)
.timestamp(System.currentTimeMillis())
.currency("USD")
.build();
// This would typically be tested with a full KSQL DB setup
assertDoesNotThrow(() -> {
// In a real test, you would verify the event was processed
// and appeared in the appropriate streams/tables
});
}
}
Error Handling and Monitoring
1. Custom Exceptions
public class KsqlExecutionException extends RuntimeException {
public KsqlExecutionException(String message) {
super(message);
}
public KsqlExecutionException(String message, Throwable cause) {
super(message, cause);
}
}
public class StreamProcessingException extends RuntimeException {
public StreamProcessingException(String message) {
super(message);
}
public StreamProcessingException(String message, Throwable cause) {
super(message, cause);
}
}
2. Monitoring and Metrics
@Component
@Slf4j
public class KsqlMetricsMonitor {
private final MeterRegistry meterRegistry;
private final Counter ksqlQueryCounter;
private final Timer ksqlExecutionTimer;
private final Counter ksqlErrorCounter;
public KsqlMetricsMonitor(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.ksqlQueryCounter = meterRegistry.counter("ksql.queries.executed");
this.ksqlExecutionTimer = meterRegistry.timer("ksql.execution.time");
this.ksqlErrorCounter = meterRegistry.counter("ksql.errors");
}
public void recordQueryExecution(String queryType, Duration duration, boolean success) {
ksqlQueryCounter.increment();
ksqlExecutionTimer.record(duration);
Tags tags = Tags.of(
Tag.of("query_type", queryType),
Tag.of("success", String.valueOf(success))
);
meterRegistry.counter("ksql.queries.detailed", tags).increment();
meterRegistry.timer("ksql.execution.time.detailed", tags).record(duration);
if (!success) {
ksqlErrorCounter.increment();
}
log.debug("KSQL query executed - type: {}, duration: {}ms, success: {}",
queryType, duration.toMillis(), success);
}
public void recordStreamEvent(String streamName, String eventType) {
Tags tags = Tags.of(
Tag.of("stream_name", streamName),
Tag.of("event_type", eventType)
);
meterRegistry.counter("ksql.stream.events", tags).increment();
}
}
Configuration Best Practices
1. Application Configuration
# application-prod.yml
spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
properties:
schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}
security.protocol: SASL_SSL
sasl.mechanism: PLAIN
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="${KAFKA_USERNAME}" password="${KAFKA_PASSWORD}";
ksql:
server: ${KSQL_SERVER:http://localhost:8088}
basic-auth:
username: ${KSQL_USERNAME:}
password: ${KSQL_PASSWORD:}
management:
endpoints:
web:
exposure:
include: health,metrics,info
endpoint:
health:
show-details: always
metrics:
export:
prometheus:
enabled: true
logging:
level:
io.confluent.ksql: WARN
org.apache.kafka: WARN
com.example.ksql: INFO
Conclusion
KSQL provides a powerful SQL-like interface for real-time stream processing on Kafka, and when combined with Java applications, it enables building sophisticated real-time analytics and event processing systems.
Key Benefits:
- SQL Familiarity: Leverage existing SQL knowledge for stream processing
- Real-time Processing: Process data as it arrives with sub-second latency
- Scalability: Built on Kafka's scalable architecture
- Integration: Seamless integration with Java applications and microservices
Use Cases:
- Real-time Analytics: Dashboards, metrics, and business intelligence
- Fraud Detection: Pattern matching and anomaly detection
- Recommendation Engines: Real-time personalization
- IoT Data Processing: Sensor data aggregation and alerting
- Customer 360: Real-time customer behavior analysis
Best Practices:
- Start Simple: Begin with basic streams and tables
- Monitor Performance: Track query performance and resource usage
- Use Windowing: Leverage tumbling, hopping, and session windows appropriately
- Handle Errors: Implement robust error handling and retry mechanisms
- Security: Secure KSQL clusters and implement proper authentication
By combining KSQL's streaming SQL capabilities with Java's enterprise features, you can build powerful, scalable real-time applications that process massive volumes of data with low latency and high reliability.