Introduction to Custom Collectors
Java Streams provide powerful collection operations through the Collectors utility class. However, sometimes you need specialized collection behavior that isn't covered by the built-in collectors. Custom collectors allow you to define your own reduction operations for complex scenarios.
1. Basic Custom Collector Structure
Understanding the Collector Interface
import java.util.*;
import java.util.function.*;
import java.util.stream.Collector;
public class CustomCollectorBasics {
// The Collector interface has 5 components:
// 1. Supplier: () -> A - creates new container
// 2. Accumulator: (A, T) -> void - adds element to container
// 3. Combiner: (A, A) -> A - merges two containers
// 4. Finisher: (A) -> R - transforms container to result
// 5. Characteristics: Set of collector properties
public static class SimpleStringCollector
implements Collector<String, StringBuilder, String> {
// Supplier: creates new StringBuilder
@Override
public Supplier<StringBuilder> supplier() {
return StringBuilder::new;
}
// Accumulator: appends string to StringBuilder with separator
@Override
public BiConsumer<StringBuilder, String> accumulator() {
return (sb, str) -> {
if (sb.length() > 0) {
sb.append(", ");
}
sb.append(str);
};
}
// Combiner: merges two StringBuilders
@Override
public BinaryOperator<StringBuilder> combiner() {
return (sb1, sb2) -> {
if (sb1.length() > 0 && sb2.length() > 0) {
sb1.append(", ");
}
return sb1.append(sb2);
};
}
// Finisher: converts StringBuilder to String
@Override
public Function<StringBuilder, String> finisher() {
return StringBuilder::toString;
}
// Characteristics: defines collector behavior
@Override
public Set<Characteristics> characteristics() {
return Set.of(Characteristics.UNORDERED);
}
}
public static void main(String[] args) {
List<String> names = Arrays.asList("Alice", "Bob", "Charlie", "Diana");
// Using custom collector
String result = names.stream()
.collect(new SimpleStringCollector());
System.out.println("Combined names: " + result);
// Using Collector.of() for quick implementation
Collector<String, StringBuilder, String> quickCollector =
Collector.of(
StringBuilder::new, // supplier
(sb, s) -> sb.append(s).append("; "), // accumulator
StringBuilder::append, // combiner
StringBuilder::toString // finisher
);
String quickResult = names.stream().collect(quickCollector);
System.out.println("Quick collector result: " + quickResult);
}
}
Step-by-Step Collector Creation
import java.util.*;
import java.util.function.*;
import java.util.stream.Collector;
public class StepByStepCollector {
// Example: Collector that finds both min and max in single pass
public static class MinMaxCollector<T>
implements Collector<T, MinMaxCollector.MinMaxAccumulator<T>, MinMaxCollector.MinMaxResult<T>> {
private final Comparator<? super T> comparator;
public MinMaxCollector(Comparator<? super T> comparator) {
this.comparator = comparator;
}
// Accumulator container class
public static class MinMaxAccumulator<T> {
private T min;
private T max;
private long count;
void accumulate(T element) {
if (count == 0) {
min = max = element;
} else {
if (comparator.compare(element, min) < 0) {
min = element;
}
if (comparator.compare(element, max) > 0) {
max = element;
}
}
count++;
}
MinMaxAccumulator<T> combine(MinMaxAccumulator<T> other) {
if (other.count == 0) {
return this;
}
if (this.count == 0) {
return other;
}
if (comparator.compare(other.min, this.min) < 0) {
this.min = other.min;
}
if (comparator.compare(other.max, this.max) > 0) {
this.max = other.max;
}
this.count += other.count;
return this;
}
}
// Result class
public static class MinMaxResult<T> {
private final T min;
private final T max;
private final long count;
public MinMaxResult(T min, T max, long count) {
this.min = min;
this.max = max;
this.count = count;
}
public T getMin() { return min; }
public T getMax() { return max; }
public long getCount() { return count; }
public Optional<T> getMinOptional() { return Optional.ofNullable(min); }
public Optional<T> getMaxOptional() { return Optional.ofNullable(max); }
@Override
public String toString() {
return String.format("MinMaxResult{min=%s, max=%s, count=%d}", min, max, count);
}
}
@Override
public Supplier<MinMaxAccumulator<T>> supplier() {
return MinMaxAccumulator::new;
}
@Override
public BiConsumer<MinMaxAccumulator<T>, T> accumulator() {
return MinMaxAccumulator::accumulate;
}
@Override
public BinaryOperator<MinMaxAccumulator<T>> combiner() {
return MinMaxAccumulator::combine;
}
@Override
public Function<MinMaxAccumulator<T>, MinMaxResult<T>> finisher() {
return acc -> new MinMaxResult<>(acc.min, acc.max, acc.count);
}
@Override
public Set<Characteristics> characteristics() {
return Set.of(Characteristics.UNORDERED);
}
}
public static <T> Collector<T, ?, MinMaxResult<T>> minMax(Comparator<? super T> comparator) {
return new MinMaxCollector<>(comparator);
}
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(5, 2, 8, 1, 9, 3, 7);
MinMaxResult<Integer> result = numbers.stream()
.collect(minMax(Integer::compareTo));
System.out.println("Numbers: " + numbers);
System.out.println("MinMax result: " + result);
System.out.println("Min: " + result.getMin());
System.out.println("Max: " + result.getMax());
System.out.println("Count: " + result.getCount());
// Test with empty stream
MinMaxResult<Integer> emptyResult = Collections.<Integer>emptyList().stream()
.collect(minMax(Integer::compareTo));
System.out.println("\nEmpty stream result: " + emptyResult);
System.out.println("Min present: " + emptyResult.getMinOptional().isPresent());
System.out.println("Max present: " + emptyResult.getMaxOptional().isPresent());
}
}
2. Practical Custom Collectors
Statistical Collector
import java.util.*;
import java.util.function.*;
import java.util.stream.Collector;
public class StatisticalCollectors {
// Collector for comprehensive statistics
public static class StatisticsCollector<T>
implements Collector<T, StatisticsCollector.StatsAccumulator<T>, StatisticsCollector.Statistics<T>> {
private final ToDoubleFunction<T> mapper;
private final Comparator<T> comparator;
public StatisticsCollector(ToDoubleFunction<T> mapper, Comparator<T> comparator) {
this.mapper = mapper;
this.comparator = comparator;
}
// Accumulator that maintains multiple statistics
public static class StatsAccumulator<T> {
private long count;
private double sum;
private double min = Double.MAX_VALUE;
private double max = Double.MIN_VALUE;
private T minElement;
private T maxElement;
private final ToDoubleFunction<T> mapper;
private final Comparator<T> comparator;
public StatsAccumulator(ToDoubleFunction<T> mapper, Comparator<T> comparator) {
this.mapper = mapper;
this.comparator = comparator;
}
void accumulate(T element) {
double value = mapper.applyAsDouble(element);
count++;
sum += value;
if (value < min) {
min = value;
minElement = element;
}
if (value > max) {
max = value;
maxElement = element;
}
}
StatsAccumulator<T> combine(StatsAccumulator<T> other) {
if (other.count == 0) return this;
if (this.count == 0) return other;
this.count += other.count;
this.sum += other.sum;
if (other.min < this.min) {
this.min = other.min;
this.minElement = other.minElement;
}
if (other.max > this.max) {
this.max = other.max;
this.maxElement = other.maxElement;
}
return this;
}
}
// Comprehensive statistics result
public static class Statistics<T> {
private final long count;
private final double sum;
private final double average;
private final double min;
private final double max;
private final T minElement;
private final T maxElement;
public Statistics(long count, double sum, double min, double max,
T minElement, T maxElement) {
this.count = count;
this.sum = sum;
this.average = count > 0 ? sum / count : 0.0;
this.min = min;
this.max = max;
this.minElement = minElement;
this.maxElement = maxElement;
}
// Getters
public long getCount() { return count; }
public double getSum() { return sum; }
public double getAverage() { return average; }
public double getMin() { return min; }
public double getMax() { return max; }
public T getMinElement() { return minElement; }
public T getMaxElement() { return maxElement; }
@Override
public String toString() {
return String.format(
"Statistics{count=%d, sum=%.2f, avg=%.2f, min=%.2f, max=%.2f, minElement=%s, maxElement=%s}",
count, sum, average, min, max, minElement, maxElement
);
}
}
@Override
public Supplier<StatsAccumulator<T>> supplier() {
return () -> new StatsAccumulator<>(mapper, comparator);
}
@Override
public BiConsumer<StatsAccumulator<T>, T> accumulator() {
return StatsAccumulator::accumulate;
}
@Override
public BinaryOperator<StatsAccumulator<T>> combiner() {
return StatsAccumulator::combine;
}
@Override
public Function<StatsAccumulator<T>, Statistics<T>> finisher() {
return acc -> new Statistics<>(
acc.count, acc.sum,
acc.count > 0 ? acc.min : 0.0,
acc.count > 0 ? acc.max : 0.0,
acc.minElement, acc.maxElement
);
}
@Override
public Set<Characteristics> characteristics() {
return Set.of(Characteristics.UNORDERED);
}
}
// Factory method
public static <T> Collector<T, ?, Statistics<T>> statistics(
ToDoubleFunction<T> mapper, Comparator<T> comparator) {
return new StatisticsCollector<>(mapper, comparator);
}
// Example data class
static class Product {
private final String name;
private final double price;
private final int quantity;
public Product(String name, double price, int quantity) {
this.name = name;
this.price = price;
this.quantity = quantity;
}
public String getName() { return name; }
public double getPrice() { return price; }
public int getQuantity() { return quantity; }
public double getTotalValue() { return price * quantity; }
@Override
public String toString() {
return String.format("Product{name='%s', price=%.2f, quantity=%d}", name, price, quantity);
}
}
public static void main(String[] args) {
List<Product> products = Arrays.asList(
new Product("Laptop", 999.99, 5),
new Product("Mouse", 29.99, 20),
new Product("Keyboard", 79.99, 15),
new Product("Monitor", 299.99, 8),
new Product("Tablet", 499.99, 12)
);
// Statistics on product prices
Statistics<Product> priceStats = products.stream()
.collect(statistics(Product::getPrice, Comparator.comparing(Product::getPrice)));
System.out.println("=== Price Statistics ===");
System.out.println(priceStats);
// Statistics on total inventory value
Statistics<Product> valueStats = products.stream()
.collect(statistics(Product::getTotalValue, Comparator.comparing(Product::getTotalValue)));
System.out.println("\n=== Inventory Value Statistics ===");
System.out.println(valueStats);
System.out.println("Most valuable product: " + valueStats.getMaxElement());
System.out.println("Least valuable product: " + valueStats.getMinElement());
// Using with primitive streams
double[] temperatures = {22.5, 18.3, 25.7, 16.8, 30.2, 19.5};
Statistics<Double> tempStats = Arrays.stream(temperatures)
.boxed()
.collect(statistics(Double::doubleValue, Double::compare));
System.out.println("\n=== Temperature Statistics ===");
System.out.println(tempStats);
}
}
Grouping and Partitioning Collector
import java.util.*;
import java.util.function.*;
import java.util.stream.Collector;
public class AdvancedGroupingCollectors {
// Collector that groups by multiple classifiers
public static class MultiLevelGroupingCollector<T, K1, K2, A, D>
implements Collector<T, Map<K1, Map<K2, A>>, Map<K1, Map<K2, D>>> {
private final Function<T, K1> firstClassifier;
private final Function<T, K2> secondClassifier;
private final Collector<T, A, D> downstream;
public MultiLevelGroupingCollector(Function<T, K1> firstClassifier,
Function<T, K2> secondClassifier,
Collector<T, A, D> downstream) {
this.firstClassifier = firstClassifier;
this.secondClassifier = secondClassifier;
this.downstream = downstream;
}
@Override
public Supplier<Map<K1, Map<K2, A>>> supplier() {
return HashMap::new;
}
@Override
public BiConsumer<Map<K1, Map<K2, A>>, T> accumulator() {
return (map, element) -> {
K1 firstKey = firstClassifier.apply(element);
K2 secondKey = secondClassifier.apply(element);
map.computeIfAbsent(firstKey, k -> new HashMap<>())
.computeIfAbsent(secondKey, k -> downstream.supplier().get())
.accumulate(element);
};
}
@Override
public BinaryOperator<Map<K1, Map<K2, A>>> combiner() {
return (map1, map2) -> {
for (Map.Entry<K1, Map<K2, A>> entry : map2.entrySet()) {
K1 key = entry.getKey();
Map<K2, A> innerMap2 = entry.getValue();
map1.merge(key, innerMap2, (innerMap1, innerMap2Merge) -> {
for (Map.Entry<K2, A> innerEntry : innerMap2Merge.entrySet()) {
innerMap1.merge(innerEntry.getKey(), innerEntry.getValue(),
downstream.combiner()::apply);
}
return innerMap1;
});
}
return map1;
};
}
@Override
public Function<Map<K1, Map<K2, A>>, Map<K1, Map<K2, D>>> finisher() {
return map -> {
Map<K1, Map<K2, D>> result = new HashMap<>();
for (Map.Entry<K1, Map<K2, A>> entry : map.entrySet()) {
Map<K2, D> finishedInnerMap = new HashMap<>();
for (Map.Entry<K2, A> innerEntry : entry.getValue().entrySet()) {
finishedInnerMap.put(innerEntry.getKey(),
downstream.finisher().apply(innerEntry.getValue()));
}
result.put(entry.getKey(), finishedInnerMap);
}
return result;
};
}
@Override
public Set<Characteristics> characteristics() {
return Set.of();
}
}
// Factory method for multi-level grouping
public static <T, K1, K2, A, D> Collector<T, ?, Map<K1, Map<K2, D>>> multiLevelGrouping(
Function<T, K1> firstClassifier,
Function<T, K2> secondClassifier,
Collector<T, A, D> downstream) {
return new MultiLevelGroupingCollector<>(firstClassifier, secondClassifier, downstream);
}
// Collector for partitioning with statistics
public static class PartitioningWithStatsCollector<T>
implements Collector<T, PartitioningWithStatsCollector.PartitionStatsAccumulator<T>,
PartitioningWithStatsCollector.PartitionStats<T>> {
private final Predicate<T> predicate;
private final ToDoubleFunction<T> mapper;
public PartitioningWithStatsCollector(Predicate<T> predicate, ToDoubleFunction<T> mapper) {
this.predicate = predicate;
this.mapper = mapper;
}
static class PartitionStatsAccumulator<T> {
final List<T> trueElements = new ArrayList<>();
final List<T> falseElements = new ArrayList<>();
double trueSum = 0.0;
double falseSum = 0.0;
final ToDoubleFunction<T> mapper;
PartitionStatsAccumulator(ToDoubleFunction<T> mapper) {
this.mapper = mapper;
}
void accumulate(T element) {
if (predicate.test(element)) {
trueElements.add(element);
trueSum += mapper.applyAsDouble(element);
} else {
falseElements.add(element);
falseSum += mapper.applyAsDouble(element);
}
}
PartitionStatsAccumulator<T> combine(PartitionStatsAccumulator<T> other) {
trueElements.addAll(other.trueElements);
falseElements.addAll(other.falseElements);
trueSum += other.trueSum;
falseSum += other.falseSum;
return this;
}
}
static class PartitionStats<T> {
final List<T> trueElements;
final List<T> falseElements;
final double trueSum;
final double falseSum;
final double trueAverage;
final double falseAverage;
PartitionStats(List<T> trueElements, List<T> falseElements,
double trueSum, double falseSum) {
this.trueElements = trueElements;
this.falseElements = falseElements;
this.trueSum = trueSum;
this.falseSum = falseSum;
this.trueAverage = trueElements.isEmpty() ? 0.0 : trueSum / trueElements.size();
this.falseAverage = falseElements.isEmpty() ? 0.0 : falseSum / falseElements.size();
}
@Override
public String toString() {
return String.format(
"PartitionStats{trueCount=%d, falseCount=%d, trueSum=%.2f, falseSum=%.2f, trueAvg=%.2f, falseAvg=%.2f}",
trueElements.size(), falseElements.size(), trueSum, falseSum, trueAverage, falseAverage
);
}
}
@Override
public Supplier<PartitionStatsAccumulator<T>> supplier() {
return () -> new PartitionStatsAccumulator<>(mapper);
}
@Override
public BiConsumer<PartitionStatsAccumulator<T>, T> accumulator() {
return PartitionStatsAccumulator::accumulate;
}
@Override
public BinaryOperator<PartitionStatsAccumulator<T>> combiner() {
return PartitionStatsAccumulator::combine;
}
@Override
public Function<PartitionStatsAccumulator<T>, PartitionStats<T>> finisher() {
return acc -> new PartitionStats<>(
acc.trueElements, acc.falseElements, acc.trueSum, acc.falseSum
);
}
@Override
public Set<Characteristics> characteristics() {
return Set.of(Characteristics.UNORDERED);
}
}
// Factory method for partitioning with stats
public static <T> Collector<T, ?, PartitioningWithStatsCollector.PartitionStats<T>> partitioningWithStats(
Predicate<T> predicate, ToDoubleFunction<T> mapper) {
return new PartitioningWithStatsCollector<>(predicate, mapper);
}
// Example usage
static class Employee {
private final String name;
private final String department;
private final String role;
private final double salary;
private final int experience;
public Employee(String name, String department, String role, double salary, int experience) {
this.name = name;
this.department = department;
this.role = role;
this.salary = salary;
this.experience = experience;
}
// Getters
public String getName() { return name; }
public String getDepartment() { return department; }
public String getRole() { return role; }
public double getSalary() { return salary; }
public int getExperience() { return experience; }
public boolean isSenior() { return experience >= 5; }
@Override
public String toString() {
return String.format("Employee{name='%s', dept='%s', role='%s', salary=%.2f, exp=%d}",
name, department, role, salary, experience);
}
}
public static void main(String[] args) {
List<Employee> employees = Arrays.asList(
new Employee("Alice", "Engineering", "Developer", 80000, 3),
new Employee("Bob", "Engineering", "Senior Developer", 120000, 7),
new Employee("Charlie", "Marketing", "Manager", 90000, 6),
new Employee("Diana", "Engineering", "Developer", 85000, 2),
new Employee("Eve", "Marketing", "Specialist", 70000, 4),
new Employee("Frank", "Engineering", "Senior Developer", 130000, 8),
new Employee("Grace", "HR", "Manager", 95000, 5)
);
// Multi-level grouping: Department -> Role -> List of Employees
Map<String, Map<String, List<Employee>>> deptRoleGroups = employees.stream()
.collect(multiLevelGrouping(
Employee::getDepartment,
Employee::getRole,
Collectors.toList()
));
System.out.println("=== Multi-level Grouping ===");
deptRoleGroups.forEach((dept, roleMap) -> {
System.out.println("\nDepartment: " + dept);
roleMap.forEach((role, emps) -> {
System.out.println(" Role: " + role + " (" + emps.size() + " employees)");
emps.forEach(emp -> System.out.println(" - " + emp.getName()));
});
});
// Partitioning with salary statistics
var partitionStats = employees.stream()
.collect(partitioningWithStats(Employee::isSenior, Employee::getSalary));
System.out.println("\n=== Partitioning with Statistics ===");
System.out.println(partitionStats);
System.out.println("\nSenior employees:");
partitionStats.trueElements.forEach(emp ->
System.out.println(" - " + emp.getName() + " (salary: " + emp.getSalary() + ")"));
System.out.println("\nJunior employees:");
partitionStats.falseElements.forEach(emp ->
System.out.println(" - " + emp.getName() + " (salary: " + emp.getSalary() + ")"));
// Multi-level grouping with counting
Map<String, Map<String, Long>> deptRoleCounts = employees.stream()
.collect(multiLevelGrouping(
Employee::getDepartment,
Employee::getRole,
Collectors.counting()
));
System.out.println("\n=== Department-Role Counts ===");
deptRoleCounts.forEach((dept, roleCounts) -> {
System.out.println("Department: " + dept);
roleCounts.forEach((role, count) ->
System.out.println(" " + role + ": " + count)
);
});
}
}
3. Advanced Custom Collectors
Windowed Collection Collector
import java.util.*;
import java.util.function.*;
import java.util.stream.Collector;
public class WindowedCollectors {
// Collector that groups elements into sliding windows
public static class SlidingWindowCollector<T>
implements Collector<T, SlidingWindowCollector.WindowAccumulator<T>, List<List<T>>> {
private final int windowSize;
private final int stepSize;
public SlidingWindowCollector(int windowSize, int stepSize) {
if (windowSize <= 0 || stepSize <= 0) {
throw new IllegalArgumentException("Window size and step size must be positive");
}
this.windowSize = windowSize;
this.stepSize = stepSize;
}
static class WindowAccumulator<T> {
private final List<T> buffer = new ArrayList<>();
private final List<List<T>> windows = new ArrayList<>();
private final int windowSize;
private final int stepSize;
private int position = 0;
WindowAccumulator(int windowSize, int stepSize) {
this.windowSize = windowSize;
this.stepSize = stepSize;
}
void accumulate(T element) {
buffer.add(element);
position++;
// Check if we can form a new window
if (position >= windowSize && (position - windowSize) % stepSize == 0) {
int startIndex = position - windowSize;
List<T> window = new ArrayList<>(buffer.subList(startIndex, startIndex + windowSize));
windows.add(window);
}
}
WindowAccumulator<T> combine(WindowAccumulator<T> other) {
// For parallel streams, this is complex for sliding windows
// In practice, sliding windows are often sequential
throw new UnsupportedOperationException("Sliding window collector doesn't support parallel streams");
}
}
@Override
public Supplier<WindowAccumulator<T>> supplier() {
return () -> new WindowAccumulator<>(windowSize, stepSize);
}
@Override
public BiConsumer<WindowAccumulator<T>, T> accumulator() {
return WindowAccumulator::accumulate;
}
@Override
public BinaryOperator<WindowAccumulator<T>> combiner() {
return (acc1, acc2) -> {
throw new UnsupportedOperationException("Sliding window collector doesn't support parallel streams");
};
}
@Override
public Function<WindowAccumulator<T>, List<List<T>>> finisher() {
return acc -> acc.windows;
}
@Override
public Set<Characteristics> characteristics() {
return Set.of(); // Not CONCURRENT, not UNORDERED
}
}
// Collector for tumbling windows (non-overlapping)
public static class TumblingWindowCollector<T>
implements Collector<T, TumblingWindowCollector.WindowAccumulator<T>, List<List<T>>> {
private final int windowSize;
public TumblingWindowCollector(int windowSize) {
if (windowSize <= 0) {
throw new IllegalArgumentException("Window size must be positive");
}
this.windowSize = windowSize;
}
static class WindowAccumulator<T> {
private final List<T> currentWindow = new ArrayList<>();
private final List<List<T>> windows = new ArrayList<>();
private final int windowSize;
WindowAccumulator(int windowSize) {
this.windowSize = windowSize;
}
void accumulate(T element) {
currentWindow.add(element);
if (currentWindow.size() >= windowSize) {
windows.add(new ArrayList<>(currentWindow));
currentWindow.clear();
}
}
WindowAccumulator<T> combine(WindowAccumulator<T> other) {
// For parallel streams, combine windows
windows.addAll(other.windows);
// Handle remaining elements
if (!other.currentWindow.isEmpty()) {
if (currentWindow.isEmpty()) {
currentWindow.addAll(other.currentWindow);
} else {
// Merge current windows
currentWindow.addAll(other.currentWindow);
if (currentWindow.size() >= windowSize) {
windows.add(new ArrayList<>(currentWindow.subList(0, windowSize)));
currentWindow.subList(0, windowSize).clear();
}
}
}
return this;
}
List<List<T>> finish() {
if (!currentWindow.isEmpty()) {
windows.add(new ArrayList<>(currentWindow));
}
return windows;
}
}
@Override
public Supplier<WindowAccumulator<T>> supplier() {
return () -> new WindowAccumulator<>(windowSize);
}
@Override
public BiConsumer<WindowAccumulator<T>, T> accumulator() {
return WindowAccumulator::accumulate;
}
@Override
public BinaryOperator<WindowAccumulator<T>> combiner() {
return WindowAccumulator::combine;
}
@Override
public Function<WindowAccumulator<T>, List<List<T>>> finisher() {
return WindowAccumulator::finish;
}
@Override
public Set<Characteristics> characteristics() {
return Set.of(Characteristics.UNORDERED);
}
}
// Factory methods
public static <T> Collector<T, ?, List<List<T>>> slidingWindow(int windowSize, int stepSize) {
return new SlidingWindowCollector<>(windowSize, stepSize);
}
public static <T> Collector<T, ?, List<List<T>>> tumblingWindow(int windowSize) {
return new TumblingWindowCollector<>(windowSize);
}
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
System.out.println("Original data: " + numbers);
// Sliding windows: size=3, step=1
List<List<Integer>> slidingWindows = numbers.stream()
.collect(slidingWindow(3, 1));
System.out.println("\n=== Sliding Windows (size=3, step=1) ===");
for (int i = 0; i < slidingWindows.size(); i++) {
System.out.println("Window " + (i + 1) + ": " + slidingWindows.get(i));
}
// Sliding windows: size=3, step=2
List<List<Integer>> slidingWindowsStep2 = numbers.stream()
.collect(slidingWindow(3, 2));
System.out.println("\n=== Sliding Windows (size=3, step=2) ===");
for (int i = 0; i < slidingWindowsStep2.size(); i++) {
System.out.println("Window " + (i + 1) + ": " + slidingWindowsStep2.get(i));
}
// Tumbling windows: size=4
List<List<Integer>> tumblingWindows = numbers.stream()
.collect(tumblingWindow(4));
System.out.println("\n=== Tumbling Windows (size=4) ===");
for (int i = 0; i < tumblingWindows.size(); i++) {
System.out.println("Window " + (i + 1) + ": " + tumblingWindows.get(i));
}
// Real-world example: Sensor data analysis
List<Double> sensorReadings = Arrays.asList(23.5, 24.1, 25.3, 26.7, 24.8,
23.9, 25.1, 26.2, 27.5, 28.1);
System.out.println("\n=== Sensor Data Analysis ===");
System.out.println("Sensor readings: " + sensorReadings);
// Calculate moving averages using sliding windows
List<Double> movingAverages = sensorReadings.stream()
.collect(slidingWindow(3, 1))
.stream()
.map(window -> window.stream().mapToDouble(Double::doubleValue).average().orElse(0.0))
.toList();
System.out.println("Moving averages (window=3): " + movingAverages);
// Find maximum in each tumbling window
List<Double> windowMaxima = sensorReadings.stream()
.collect(tumblingWindow(4))
.stream()
.map(window -> window.stream().mapToDouble(Double::doubleValue).max().orElse(0.0))
.toList();
System.out.println("Window maxima (size=4): " + windowMaxima);
}
}
Stateful Custom Collector
import java.util.*;
import java.util.function.*;
import java.util.stream.Collector;
public class StatefulCollectors {
// Collector that maintains state across elements
public static class RunningStatisticsCollector<T>
implements Collector<T, RunningStatisticsCollector.RunningStatsAccumulator<T>,
RunningStatisticsCollector.RunningStatistics<T>> {
private final ToDoubleFunction<T> valueExtractor;
public RunningStatisticsCollector(ToDoubleFunction<T> valueExtractor) {
this.valueExtractor = valueExtractor;
}
static class RunningStatsAccumulator<T> {
private final List<DataPoint<T>> dataPoints = new ArrayList<>();
private final ToDoubleFunction<T> valueExtractor;
RunningStatsAccumulator(ToDoubleFunction<T> valueExtractor) {
this.valueExtractor = valueExtractor;
}
void accumulate(T element) {
double value = valueExtractor.applyAsDouble(element);
dataPoints.add(new DataPoint<>(element, value, dataPoints.size()));
// Update running statistics
updateStatistics();
}
RunningStatsAccumulator<T> combine(RunningStatsAccumulator<T> other) {
// Adjust indices of other data points
int startIndex = this.dataPoints.size();
for (DataPoint<T> point : other.dataPoints) {
this.dataPoints.add(new DataPoint<>(
point.element, point.value, point.index + startIndex
));
}
updateStatistics();
return this;
}
private void updateStatistics() {
// This is where stateful computation happens
// For example, we could maintain running averages, sums, etc.
}
}
static class DataPoint<T> {
final T element;
final double value;
final int index;
double runningAverage;
double runningSum;
DataPoint(T element, double value, int index) {
this.element = element;
this.value = value;
this.index = index;
}
}
static class RunningStatistics<T> {
private final List<DataPoint<T>> dataPoints;
private final double finalAverage;
private final double finalSum;
private final int count;
RunningStatistics(List<DataPoint<T>> dataPoints) {
this.dataPoints = dataPoints;
this.count = dataPoints.size();
this.finalSum = dataPoints.stream().mapToDouble(dp -> dp.value).sum();
this.finalAverage = count > 0 ? finalSum / count : 0.0;
// Calculate running statistics
calculateRunningStats();
}
private void calculateRunningStats() {
double runningSum = 0.0;
for (int i = 0; i < dataPoints.size(); i++) {
DataPoint<T> point = dataPoints.get(i);
runningSum += point.value;
point.runningSum = runningSum;
point.runningAverage = runningSum / (i + 1);
}
}
public void printRunningStats() {
System.out.println("Index\tValue\tRunning Sum\tRunning Avg");
System.out.println("-----\t-----\t-----------\t-----------");
for (DataPoint<T> point : dataPoints) {
System.out.printf("%d\t%.2f\t%.2f\t\t%.2f%n",
point.index, point.value, point.runningSum, point.runningAverage);
}
System.out.printf("%nFinal: Count=%d, Total=%.2f, Average=%.2f%n",
count, finalSum, finalAverage);
}
public List<Double> getRunningAverages() {
return dataPoints.stream()
.map(dp -> dp.runningAverage)
.toList();
}
public List<Double> getRunningSums() {
return dataPoints.stream()
.map(dp -> dp.runningSum)
.toList();
}
}
@Override
public Supplier<RunningStatsAccumulator<T>> supplier() {
return () -> new RunningStatsAccumulator<>(valueExtractor);
}
@Override
public BiConsumer<RunningStatsAccumulator<T>, T> accumulator() {
return RunningStatsAccumulator::accumulate;
}
@Override
public BinaryOperator<RunningStatsAccumulator<T>> combiner() {
return RunningStatsAccumulator::combine;
}
@Override
public Function<RunningStatsAccumulator<T>, RunningStatistics<T>> finisher() {
return RunningStatistics::new;
}
@Override
public Set<Characteristics> characteristics() {
return Set.of(Characteristics.UNORDERED);
}
}
// Factory method
public static <T> Collector<T, ?, RunningStatistics<T>> runningStatistics(
ToDoubleFunction<T> valueExtractor) {
return new RunningStatisticsCollector<>(valueExtractor);
}
// Collector with external state
public static class StatefulThresholdCollector<T>
implements Collector<T, StatefulThresholdCollector.ThresholdAccumulator<T>, List<T>> {
private final Predicate<T> thresholdPredicate;
private final int maxElements;
// External state that can be modified
private volatile int elementsProcessed = 0;
private volatile int elementsPassed = 0;
public StatefulThresholdCollector(Predicate<T> thresholdPredicate, int maxElements) {
this.thresholdPredicate = thresholdPredicate;
this.maxElements = maxElements;
}
static class ThresholdAccumulator<T> {
private final List<T> passedElements = new ArrayList<>();
private final Predicate<T> thresholdPredicate;
private final int maxElements;
private int count = 0;
ThresholdAccumulator(Predicate<T> thresholdPredicate, int maxElements) {
this.thresholdPredicate = thresholdPredicate;
this.maxElements = maxElements;
}
void accumulate(T element) {
count++;
if (thresholdPredicate.test(element) && passedElements.size() < maxElements) {
passedElements.add(element);
}
}
ThresholdAccumulator<T> combine(ThresholdAccumulator<T> other) {
// For this collector, we don't combine well due to maxElements constraint
// In practice, such stateful collectors often work best sequentially
throw new UnsupportedOperationException("Stateful threshold collector doesn't support parallel streams");
}
}
@Override
public Supplier<ThresholdAccumulator<T>> supplier() {
return () -> new ThresholdAccumulator<>(thresholdPredicate, maxElements);
}
@Override
public BiConsumer<ThresholdAccumulator<T>, T> accumulator() {
return (acc, element) -> {
elementsProcessed++;
acc.accumulate(element);
if (acc.thresholdPredicate.test(element)) {
elementsPassed++;
}
};
}
@Override
public BinaryOperator<ThresholdAccumulator<T>> combiner() {
return (acc1, acc2) -> {
throw new UnsupportedOperationException("Stateful threshold collector doesn't support parallel streams");
};
}
@Override
public Function<ThresholdAccumulator<T>, List<T>> finisher() {
return acc -> {
System.out.printf("Processed %d elements, %d passed threshold%n",
elementsProcessed, elementsPassed);
return acc.passedElements;
};
}
@Override
public Set<Characteristics> characteristics() {
return Set.of(); // Not concurrent
}
}
public static <T> Collector<T, ?, List<T>> thresholdCollector(
Predicate<T> thresholdPredicate, int maxElements) {
return new StatefulThresholdCollector<>(thresholdPredicate, maxElements);
}
public static void main(String[] args) {
List<Double> values = Arrays.asList(10.5, 25.3, 15.7, 30.2, 18.9, 35.1, 12.4, 28.6);
System.out.println("Original values: " + values);
// Running statistics
RunningStatistics<Double> stats = values.stream()
.collect(runningStatistics(Double::doubleValue));
System.out.println("\n=== Running Statistics ===");
stats.printRunningStats();
System.out.println("\nRunning averages: " + stats.getRunningAverages());
System.out.println("Running sums: " + stats.getRunningSums());
// Threshold collector
List<Double> highValues = values.stream()
.collect(thresholdCollector(v -> v > 20.0, 3));
System.out.println("\n=== Threshold Collector (values > 20.0, max 3) ===");
System.out.println("High values: " + highValues);
// Real-world example: Stock prices
List<Double> stockPrices = Arrays.asList(150.25, 152.30, 148.75, 155.40,
153.20, 157.80, 159.25, 156.90);
System.out.println("\n=== Stock Price Analysis ===");
System.out.println("Stock prices: " + stockPrices);
RunningStatistics<Double> stockStats = stockPrices.stream()
.collect(runningStatistics(Double::doubleValue));
System.out.println("\nStock Price Running Statistics:");
stockStats.printRunningStats();
// Find prices above threshold
List<Double> highPrices = stockPrices.stream()
.collect(thresholdCollector(price -> price > 155.0, 5));
System.out.println("\nHigh stock prices (> 155.0): " + highPrices);
}
}
Summary
Key Components of a Custom Collector:
- Supplier:
() -> A- creates the accumulation container - Accumulator:
(A, T) -> void- adds elements to the container - Combiner:
(A, A) -> A- merges two containers (for parallel streams) - Finisher:
(A) -> R- transforms the container to final result - Characteristics: Set of enum values defining collector behavior
Common Characteristics:
- CONCURRENT: Accumulator can be called concurrently
- UNORDERED: Collection order doesn't matter
- IDENTITY_FINISH: Finisher is identity function
Best Practices:
- Make accumulators stateless when possible
- Handle empty streams gracefully in finisher
- Consider parallel stream support in combiner
- Use immutable accumulation containers when possible
- Document thread-safety requirements
Use Cases for Custom Collectors:
- Complex aggregations that require multiple passes
- Specialized data structures not covered by built-in collectors
- Stateful computations across stream elements
- Windowed operations and sliding windows
- Multi-level grouping and partitioning
Performance Considerations:
- Avoid object creation in accumulator for high-performance scenarios
- Use primitive specializations when working with primitive streams
- Consider collector characteristics for parallel stream optimization
- Benchmark custom collectors against alternative approaches
Custom collectors provide a powerful way to extend Java Streams with specialized reduction operations, enabling complex data processing patterns while maintaining the fluent API style of the Streams framework.