Table of Contents
- Introduction to Reduction
- Basic Reduction Operations
- Parallel Reduction Fundamentals
- Associativity Requirement
- Identity and Combiner
- Custom Reducers
- Performance Considerations
- Common Patterns
Introduction to Reduction
Reduction is the process of combining all elements of a stream into a single result. In parallel streams, reduction becomes more complex as elements are processed concurrently and results need to be combined.
Sequential vs Parallel Reduction
import java.util.*;
import java.util.stream.*;
public class ReductionIntroduction {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
System.out.println("=== Sequential vs Parallel Reduction ===");
// Sequential reduction
long seqStart = System.nanoTime();
int sequentialSum = numbers.stream()
.reduce(0, (a, b) -> a + b);
long seqTime = System.nanoTime() - seqStart;
// Parallel reduction
long parStart = System.nanoTime();
int parallelSum = numbers.parallelStream()
.reduce(0, (a, b) -> a + b);
long parTime = System.nanoTime() - parStart;
System.out.println("Sequential sum: " + sequentialSum + " in " + seqTime + "ns");
System.out.println("Parallel sum: " + parallelSum + " in " + parTime + "ns");
System.out.println("Results equal: " + (sequentialSum == parallelSum));
demonstrateReductionProcess();
}
public static void demonstrateReductionProcess() {
System.out.println("\n=== Reduction Process ===");
List<String> words = Arrays.asList("Hello", "World", "Stream", "API");
// String concatenation reduction
String concatenated = words.stream()
.reduce("", (partial, element) -> {
String result = partial + " " + element;
System.out.println("Combining: '" + partial + "' + '" + element + "' = '" + result + "'");
return result;
});
System.out.println("Final result: " + concatenated.trim());
// Parallel version shows different combination order
System.out.println("\nParallel reduction (order may vary):");
String parallelConcatenated = words.parallelStream()
.reduce("", (partial, element) -> {
String result = partial + " " + element;
System.out.println("Combining: '" + partial + "' + '" + element + "' = '" + result + "'");
return result;
});
System.out.println("Final result: " + parallelConcatenated.trim());
}
}
Basic Reduction Operations
Standard Reduction Operations
import java.util.*;
import java.util.stream.*;
public class BasicReductionOperations {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<String> words = Arrays.asList("apple", "banana", "cherry", "date");
System.out.println("=== Basic Reduction Operations ===");
// 1. Sum reduction
int sum = numbers.stream()
.reduce(0, Integer::sum);
System.out.println("Sum: " + sum);
// 2. Product reduction
int product = numbers.stream()
.reduce(1, (a, b) -> a * b);
System.out.println("Product: " + product);
// 3. Max reduction
Optional<Integer> max = numbers.stream()
.reduce(Integer::max);
System.out.println("Max: " + max.orElse(-1));
// 4. Min reduction
Optional<Integer> min = numbers.stream()
.reduce(Integer::min);
System.out.println("Min: " + min.orElse(-1));
// 5. String concatenation
String concatenated = words.stream()
.reduce("", (a, b) -> a + " " + b);
System.out.println("Concatenated: " + concatenated.trim());
// 6. Count using reduction
long count = numbers.stream()
.reduce(0, (acc, element) -> acc + 1, Integer::sum);
System.out.println("Count via reduction: " + count);
demonstrateThreeArgumentReduce();
}
public static void demonstrateThreeArgumentReduce() {
System.out.println("\n=== Three-Argument Reduce ===");
List<String> words = Arrays.asList("Hello", "World", "Stream", "API");
// Three-argument reduce: identity, accumulator, combiner
String result = words.parallelStream()
.reduce("",
(partial, element) -> {
System.out.println(Thread.currentThread().getName() +
" - Accumulator: " + partial + " + " + element);
return partial + " " + element;
},
(part1, part2) -> {
System.out.println(Thread.currentThread().getName() +
" - Combiner: " + part1 + " + " + part2);
return part1 + part2;
});
System.out.println("Final: " + result.trim());
}
}
Reduction with Primitive Streams
import java.util.*;
import java.util.stream.*;
public class PrimitiveStreamReduction {
public static void main(String[] args) {
int[] numbers = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
System.out.println("=== Primitive Stream Reduction ===");
// IntStream reductions
int sum = IntStream.of(numbers).sum();
OptionalDouble average = IntStream.of(numbers).average();
OptionalInt max = IntStream.of(numbers).max();
OptionalInt min = IntStream.of(numbers).min();
System.out.println("Sum: " + sum);
System.out.println("Average: " + average.orElse(0));
System.out.println("Max: " + max.orElse(-1));
System.out.println("Min: " + min.orElse(-1));
// Custom reduction with primitive streams
long product = IntStream.range(1, 6)
.asLongStream()
.reduce(1, (a, b) -> a * b);
System.out.println("Product 1-5: " + product);
// Statistical summary
IntSummaryStatistics stats = IntStream.of(numbers)
.summaryStatistics();
System.out.println("\nStatistics: " + stats);
demonstrateParallelPrimitiveReduction();
}
public static void demonstrateParallelPrimitiveReduction() {
System.out.println("\n=== Parallel Primitive Reduction ===");
IntStream numbers = IntStream.range(1, 1001);
long seqStart = System.nanoTime();
int sequentialSum = numbers.sum();
long seqTime = System.nanoTime() - seqStart;
long parStart = System.nanoTime();
int parallelSum = numbers.parallel().sum();
long parTime = System.nanoTime() - parStart;
System.out.println("Sequential sum: " + sequentialSum + " in " + seqTime + "ns");
System.out.println("Parallel sum: " + parallelSum + " in " + parTime + "ns");
System.out.println("Speedup: " + (double)seqTime/parTime + "x");
}
}
Parallel Reduction Fundamentals
How Parallel Reduction Works
import java.util.*;
import java.util.stream.*;
import java.util.concurrent.*;
public class ParallelReductionMechanics {
public static void main(String[] args) {
System.out.println("=== Parallel Reduction Mechanics ===");
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// Visualize parallel reduction process
int result = numbers.parallelStream()
.reduce(0,
(identity, element) -> {
String thread = Thread.currentThread().getName();
int sum = identity + element;
System.out.println(thread + " - Accumulate: " + identity + " + " + element + " = " + sum);
return sum;
},
(partial1, partial2) -> {
String thread = Thread.currentThread().getName();
int combined = partial1 + partial2;
System.out.println(thread + " - Combine: " + partial1 + " + " + partial2 + " = " + combined);
return combined;
});
System.out.println("Final result: " + result);
demonstrateForkJoinBehavior();
}
public static void demonstrateForkJoinBehavior() {
System.out.println("\n=== ForkJoin Pool Behavior ===");
// Create a larger dataset to see proper parallelization
List<Integer> largeList = IntStream.range(1, 101)
.boxed()
.collect(Collectors.toList());
System.out.println("ForkJoinPool parallelism: " +
ForkJoinPool.commonPool().getParallelism());
Map<String, Integer> threadUsage = new ConcurrentHashMap<>();
int sum = largeList.parallelStream()
.reduce(0,
(acc, num) -> {
String thread = Thread.currentThread().getName();
threadUsage.merge(thread, 1, Integer::sum);
return acc + num;
},
(a, b) -> a + b);
System.out.println("Thread usage count:");
threadUsage.forEach((thread, count) ->
System.out.println(" " + thread + ": " + count + " operations"));
System.out.println("Total sum: " + sum);
System.out.println("Expected sum: " + (100 * 101 / 2));
}
}
Reduction with Complex Objects
import java.util.*;
import java.util.stream.*;
class Transaction {
private String id;
private double amount;
private String currency;
public Transaction(String id, double amount, String currency) {
this.id = id;
this.amount = amount;
this.currency = currency;
}
public double getAmount() { return amount; }
public String getCurrency() { return currency; }
public String getId() { return id; }
@Override
public String toString() {
return String.format("Transaction{id='%s', amount=%.2f, currency='%s'}",
id, amount, currency);
}
}
class FinancialSummary {
private double totalAmount;
private int transactionCount;
private double maxAmount;
private double minAmount;
public FinancialSummary() {
this.totalAmount = 0;
this.transactionCount = 0;
this.maxAmount = Double.MIN_VALUE;
this.minAmount = Double.MAX_VALUE;
}
public FinancialSummary(double total, int count, double max, double min) {
this.totalAmount = total;
this.transactionCount = count;
this.maxAmount = max;
this.minAmount = min;
}
// Combine two summaries
public FinancialSummary combine(FinancialSummary other) {
return new FinancialSummary(
this.totalAmount + other.totalAmount,
this.transactionCount + other.transactionCount,
Math.max(this.maxAmount, other.maxAmount),
Math.min(this.minAmount, other.minAmount)
);
}
// Accumulate a transaction
public FinancialSummary accumulate(Transaction transaction) {
return new FinancialSummary(
this.totalAmount + transaction.getAmount(),
this.transactionCount + 1,
Math.max(this.maxAmount, transaction.getAmount()),
Math.min(this.minAmount, transaction.getAmount())
);
}
@Override
public String toString() {
return String.format(
"Summary{total=%.2f, count=%d, max=%.2f, min=%.2f, avg=%.2f}",
totalAmount, transactionCount, maxAmount, minAmount,
transactionCount > 0 ? totalAmount / transactionCount : 0
);
}
}
public class ComplexObjectReduction {
public static void main(String[] args) {
System.out.println("=== Complex Object Reduction ===");
List<Transaction> transactions = Arrays.asList(
new Transaction("T1", 100.0, "USD"),
new Transaction("T2", 250.5, "USD"),
new Transaction("T3", 75.25, "USD"),
new Transaction("T4", 300.0, "USD"),
new Transaction("T5", 50.0, "USD"),
new Transaction("T6", 425.75, "USD")
);
// Sequential reduction with custom object
FinancialSummary sequentialSummary = transactions.stream()
.reduce(
new FinancialSummary(),
FinancialSummary::accumulate,
FinancialSummary::combine
);
System.out.println("Sequential: " + sequentialSummary);
// Parallel reduction
FinancialSummary parallelSummary = transactions.parallelStream()
.reduce(
new FinancialSummary(),
FinancialSummary::accumulate,
FinancialSummary::combine
);
System.out.println("Parallel: " + parallelSummary);
demonstrateMutableReduction();
}
public static void demonstrateMutableReduction() {
System.out.println("\n=== Mutable Reduction ===");
List<Transaction> transactions = Arrays.asList(
new Transaction("T1", 100.0, "USD"),
new Transaction("T2", 250.5, "USD"),
new Transaction("T3", 75.25, "USD")
);
// Using collect() for mutable reduction (often better than reduce())
FinancialSummary collectedSummary = transactions.parallelStream()
.collect(
FinancialSummary::new,
(summary, transaction) -> {
// Accumulator (mutable)
summary.totalAmount += transaction.getAmount();
summary.transactionCount++;
summary.maxAmount = Math.max(summary.maxAmount, transaction.getAmount());
summary.minAmount = Math.min(summary.minAmount, transaction.getAmount());
},
(summary1, summary2) -> {
// Combiner (mutable)
summary1.totalAmount += summary2.totalAmount;
summary1.transactionCount += summary2.transactionCount;
summary1.maxAmount = Math.max(summary1.maxAmount, summary2.maxAmount);
summary1.minAmount = Math.min(summary1.minAmount, summary2.minAmount);
}
);
System.out.println("Collected: " + collectedSummary);
}
}
Associativity Requirement
Understanding Associativity
import java.util.*;
import java.util.stream.*;
public class AssociativityRequirement {
public static void main(String[] args) {
System.out.println("=== Associativity Requirement ===");
// Associative operation: (a + b) + c = a + (b + c)
demonstrateAssociativeOperations();
demonstrateNonAssociativeOperations();
}
public static void demonstrateAssociativeOperations() {
System.out.println("\n=== Associative Operations ===");
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
// Addition is associative
int sumSequential = numbers.stream()
.reduce(0, (a, b) -> a + b);
int sumParallel = numbers.parallelStream()
.reduce(0, (a, b) -> a + b);
System.out.println("Addition - Sequential: " + sumSequential + ", Parallel: " + sumParallel);
// Multiplication is associative
int productSequential = numbers.stream()
.reduce(1, (a, b) -> a * b);
int productParallel = numbers.parallelStream()
.reduce(1, (a, b) -> a * b);
System.out.println("Multiplication - Sequential: " + productSequential + ", Parallel: " + productParallel);
// Max is associative
Optional<Integer> maxSequential = numbers.stream()
.reduce(Integer::max);
Optional<Integer> maxParallel = numbers.parallelStream()
.reduce(Integer::max);
System.out.println("Max - Sequential: " + maxSequential + ", Parallel: " + parallel);
}
public static void demonstrateNonAssociativeOperations() {
System.out.println("\n=== Non-Associative Operations ===");
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
// Subtraction is NOT associative: (a - b) - c ≠ a - (b - c)
System.out.println("Subtraction example:");
System.out.println("(1 - 2) - 3 = " + ((1 - 2) - 3));
System.out.println("1 - (2 - 3) = " + (1 - (2 - 3)));
// Using non-associative operation in parallel stream gives wrong results
int subtractSequential = numbers.stream()
.reduce(0, (a, b) -> a - b);
int subtractParallel = numbers.parallelStream()
.reduce(0, (a, b) -> a - b);
System.out.println("Subtraction - Sequential: " + subtractSequential + ", Parallel: " + subtractParallel);
System.out.println("WARNING: Parallel result is incorrect due to non-associativity!");
// Division is also NOT associative
System.out.println("\nDivision example:");
System.out.println("(8 / 4) / 2 = " + ((8 / 4) / 2));
System.out.println("8 / (4 / 2) = " + (8 / (4 / 2)));
}
public static void verifyAssociativity() {
System.out.println("\n=== Verifying Associativity ===");
// Test if an operation is associative
BiFunction<Integer, Integer, Integer> operation = (a, b) -> a + b; // Change to test other operations
int a = 5, b = 3, c = 2;
int leftAssociative = operation.apply(operation.apply(a, b), c);
int rightAssociative = operation.apply(a, operation.apply(b, c));
System.out.println("Operation: " + operation);
System.out.println("(" + a + " op " + b + ") op " + c + " = " + leftAssociative);
System.out.println(a + " op (" + b + " op " + c + ") = " + rightAssociative);
System.out.println("Is associative: " + (leftAssociative == rightAssociative));
}
}
Associativity in Custom Operations
import java.util.*;
import java.util.function.*;
import java.util.stream.*;
public class CustomOperationAssociativity {
// Custom operation: weighted average accumulator
static class WeightedAverage {
double sum;
double weight;
WeightedAverage() {
this(0, 0);
}
WeightedAverage(double sum, double weight) {
this.sum = sum;
this.weight = weight;
}
// This operation MUST be associative for parallel reduction
WeightedAverage combine(WeightedAverage other) {
return new WeightedAverage(
this.sum + other.sum,
this.weight + other.weight
);
}
double getAverage() {
return weight == 0 ? 0 : sum / weight;
}
}
public static void main(String[] args) {
System.out.println("=== Custom Operation Associativity ===");
// Test data: (value, weight) pairs
List<double[]> weightedValues = Arrays.asList(
new double[]{10, 1}, // value 10, weight 1
new double[]{20, 2}, // value 20, weight 2
new double[]{30, 3}, // value 30, weight 3
new double[]{40, 4} // value 40, weight 4
);
// Weighted average calculation
WeightedAverage sequentialResult = weightedValues.stream()
.reduce(
new WeightedAverage(),
(acc, pair) -> new WeightedAverage(
acc.sum + pair[0] * pair[1],
acc.weight + pair[1]
),
WeightedAverage::combine
);
WeightedAverage parallelResult = weightedValues.parallelStream()
.reduce(
new WeightedAverage(),
(acc, pair) -> new WeightedAverage(
acc.sum + pair[0] * pair[1],
acc.weight + pair[1]
),
WeightedAverage::combine
);
System.out.println("Sequential weighted average: " + sequentialResult.getAverage());
System.out.println("Parallel weighted average: " + parallelResult.getAverage());
System.out.println("Results equal: " +
(sequentialResult.getAverage() == parallelResult.getAverage()));
verifyCustomAssociativity();
}
public static void verifyCustomAssociativity() {
System.out.println("\n=== Verifying Custom Operation Associativity ===");
// Test if our weighted average operation is associative
WeightedAverage a = new WeightedAverage(10 * 1, 1); // value 10, weight 1
WeightedAverage b = new WeightedAverage(20 * 2, 2); // value 20, weight 2
WeightedAverage c = new WeightedAverage(30 * 3, 3); // value 30, weight 3
WeightedAverage left = a.combine(b).combine(c);
WeightedAverage right = a.combine(b.combine(c));
System.out.println("(A combine B) combine C: " + left.getAverage());
System.out.println("A combine (B combine C): " + right.getAverage());
System.out.println("Operation is associative: " +
(left.getAverage() == right.getAverage()));
}
}
Identity and Combiner
Identity Element Requirements
import java.util.*;
import java.util.stream.*;
public class IdentityElement {
public static void main(String[] args) {
System.out.println("=== Identity Element Requirements ===");
demonstrateIdentityProperties();
demonstrateIdentityIssues();
}
public static void demonstrateIdentityProperties() {
System.out.println("\n=== Proper Identity Elements ===");
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
// Addition: identity is 0 (a + 0 = a)
int sumIdentity = numbers.parallelStream()
.reduce(0, (a, b) -> a + b);
System.out.println("Sum with identity 0: " + sumIdentity);
// Multiplication: identity is 1 (a * 1 = a)
int productIdentity = numbers.parallelStream()
.reduce(1, (a, b) -> a * b);
System.out.println("Product with identity 1: " + productIdentity);
// String concatenation: identity is "" (s + "" = s)
List<String> words = Arrays.asList("Hello", "World");
String concatIdentity = words.parallelStream()
.reduce("", (a, b) -> a + " " + b);
System.out.println("Concatenation with identity '': " + concatIdentity.trim());
// Max operation: no proper identity, use Optional
Optional<Integer> maxNoIdentity = numbers.parallelStream()
.reduce(Integer::max);
System.out.println("Max without identity: " + maxNoIdentity.orElse(-1));
}
public static void demonstrateIdentityIssues() {
System.out.println("\n=== Identity Element Issues ===");
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
// Wrong identity for sum (should be 0, not 10)
int wrongIdentitySum = numbers.parallelStream()
.reduce(10, (a, b) -> a + b);
System.out.println("Sum with wrong identity 10: " + wrongIdentitySum);
System.out.println("This is incorrect because 10 is added multiple times!");
// Demonstrating why wrong identity causes problems
System.out.println("\nWhy wrong identity fails:");
System.out.println("With identity 10 and numbers [1,2,3]:");
System.out.println("Correct: 1 + 2 + 3 = 6");
System.out.println("With wrong identity: 10 + 1 + 10 + 2 + 10 + 3 = 36");
// Empty stream with identity
List<Integer> emptyList = Arrays.asList();
int emptySum = emptyList.stream()
.reduce(0, (a, b) -> a + b);
System.out.println("Empty stream with identity 0: " + emptySum);
}
}
Combiner Function in Parallel Reduction
import java.util.*;
import java.util.stream.*;
import java.util.concurrent.atomic.*;
public class CombinerFunction {
public static void main(String[] args) {
System.out.println("=== Combiner Function ===");
demonstrateCombinerBasics();
demonstrateCombinerInAction();
}
public static void demonstrateCombinerBasics() {
System.out.println("\n=== Combiner Basics ===");
List<String> words = Arrays.asList("Hello", "World", "Java", "Streams");
// Three-argument reduce with explicit combiner
String result = words.parallelStream()
.reduce("",
// Accumulator: combines identity with element
(identity, element) -> {
System.out.println(Thread.currentThread().getName() +
" - Accumulator: '" + identity + "' + '" + element + "'");
return identity + " " + element;
},
// Combiner: combines two partial results
(partial1, partial2) -> {
System.out.println(Thread.currentThread().getName() +
" - Combiner: '" + partial1 + "' + '" + partial2 + "'");
return partial1 + partial2;
});
System.out.println("Final result: " + result.trim());
}
public static void demonstrateCombinerInAction() {
System.out.println("\n=== Combiner in Action ===");
// Count elements using reduce with combiner
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
AtomicInteger accumulatorCalls = new AtomicInteger();
AtomicInteger combinerCalls = new AtomicInteger();
long count = numbers.parallelStream()
.reduce(0L,
// Accumulator: count one element
(acc, element) -> {
accumulatorCalls.incrementAndGet();
return acc + 1;
},
// Combiner: sum two counts
(count1, count2) -> {
combinerCalls.incrementAndGet();
return count1 + count2;
});
System.out.println("Count: " + count);
System.out.println("Accumulator calls: " + accumulatorCalls.get());
System.out.println("Combiner calls: " + combinerCalls.get());
System.out.println("Total operations: " + (accumulatorCalls.get() + combinerCalls.get()));
// Without combiner (two-argument reduce) - may fail in parallel
try {
Long badCount = numbers.parallelStream()
.reduce(0L, (acc, element) -> acc + 1);
System.out.println("Bad count (may be wrong): " + badCount);
} catch (Exception e) {
System.out.println("Two-argument reduce failed in parallel: " + e.getMessage());
}
}
}
Custom Reducers
Building Custom Reduction Operations
import java.util.*;
import java.util.stream.*;
import java.util.function.*;
public class CustomReducers {
// Custom reducer for statistical summary
static class StatsReducer implements
Function<Double, StatsReducer>,
BinaryOperator<StatsReducer> {
private double sum = 0;
private double min = Double.MAX_VALUE;
private double max = Double.MIN_VALUE;
private long count = 0;
// Function: accumulate a value
@Override
public StatsReducer apply(Double value) {
this.sum += value;
this.min = Math.min(this.min, value);
this.max = Math.max(this.max, value);
this.count++;
return this;
}
// BinaryOperator: combine two reducers
@Override
public StatsReducer apply(StatsReducer s1, StatsReducer s2) {
StatsReducer result = new StatsReducer();
result.sum = s1.sum + s2.sum;
result.min = Math.min(s1.min, s2.min);
result.max = Math.max(s1.max, s2.max);
result.count = s1.count + s2.count;
return result;
}
public double getAverage() {
return count == 0 ? 0 : sum / count;
}
@Override
public String toString() {
return String.format("Stats{count=%d, sum=%.2f, min=%.2f, max=%.2f, avg=%.2f}",
count, sum, min, max, getAverage());
}
}
// Generic custom reducer builder
static class CustomReducer<T, R> {
private final Supplier<R> supplier;
private final BiFunction<R, T, R> accumulator;
private final BinaryOperator<R> combiner;
public CustomReducer(Supplier<R> supplier,
BiFunction<R, T, R> accumulator,
BinaryOperator<R> combiner) {
this.supplier = supplier;
this.accumulator = accumulator;
this.combiner = combiner;
}
public R reduce(Stream<T> stream) {
return stream.reduce(
supplier.get(),
accumulator,
combiner
);
}
}
public static void main(String[] args) {
System.out.println("=== Custom Reducers ===");
List<Double> numbers = Arrays.asList(1.5, 2.5, 3.5, 4.5, 5.5);
// Using custom StatsReducer
StatsReducer stats = numbers.parallelStream()
.reduce(
new StatsReducer(),
StatsReducer::apply,
StatsReducer::apply
);
System.out.println("Custom stats: " + stats);
// Using generic custom reducer
CustomReducer<Double, double[]> varianceReducer = new CustomReducer<>(
() -> new double[3], // [sum, sumOfSquares, count]
(acc, value) -> {
acc[0] += value; // sum
acc[1] += value * value; // sum of squares
acc[2]++; // count
return acc;
},
(acc1, acc2) -> {
acc1[0] += acc2[0];
acc1[1] += acc2[1];
acc1[2] += acc2[2];
return acc1;
}
);
double[] varianceResult = varianceReducer.reduce(numbers.parallelStream());
double mean = varianceResult[0] / varianceResult[2];
double variance = (varianceResult[1] / varianceResult[2]) - (mean * mean);
System.out.println("Variance: " + variance);
System.out.println("Standard deviation: " + Math.sqrt(variance));
demonstrateComplexCustomReducer();
}
public static void demonstrateComplexCustomReducer() {
System.out.println("\n=== Complex Custom Reducer ===");
List<String> sentences = Arrays.asList(
"Hello world",
"Java streams are powerful",
"Parallel reduction works well",
"Functional programming is interesting"
);
// Reducer for text statistics
class TextStats {
int totalWords = 0;
int totalChars = 0;
int totalSentences = 0;
String longestWord = "";
TextStats accumulate(String sentence) {
String[] words = sentence.split(" ");
this.totalWords += words.length;
this.totalChars += sentence.replace(" ", "").length();
this.totalSentences++;
// Find longest word in this sentence
String sentenceLongest = Arrays.stream(words)
.max(Comparator.comparing(String::length))
.orElse("");
if (sentenceLongest.length() > this.longestWord.length()) {
this.longestWord = sentenceLongest;
}
return this;
}
TextStats combine(TextStats other) {
TextStats result = new TextStats();
result.totalWords = this.totalWords + other.totalWords;
result.totalChars = this.totalChars + other.totalChars;
result.totalSentences = this.totalSentences + other.totalSentences;
result.longestWord = this.longestWord.length() > other.longestWord.length()
? this.longestWord : other.longestWord;
return result;
}
@Override
public String toString() {
return String.format(
"TextStats{sentences=%d, words=%d, chars=%d, avgWordLen=%.2f, longest='%s'}",
totalSentences, totalWords, totalChars,
totalWords > 0 ? (double)totalChars / totalWords : 0,
longestWord
);
}
}
TextStats textStats = sentences.parallelStream()
.reduce(
new TextStats(),
TextStats::accumulate,
TextStats::combine
);
System.out.println("Text statistics: " + textStats);
}
}
Performance Considerations
Parallel Reduction Performance
import java.util.*;
import java.util.stream.*;
import java.util.concurrent.*;
public class ParallelReductionPerformance {
public static void main(String[] args) {
System.out.println("=== Parallel Reduction Performance ===");
benchmarkSimpleOperations();
benchmarkComplexOperations();
analyzePerformanceFactors();
}
public static void benchmarkSimpleOperations() {
System.out.println("\n=== Simple Operations Benchmark ===");
int size = 10_000_000;
List<Integer> numbers = new ArrayList<>(size);
Random random = new Random();
for (int i = 0; i < size; i++) {
numbers.add(random.nextInt(1000));
}
// Sum benchmark
benchmarkOperation("Sum", numbers,
stream -> stream.reduce(0, Integer::sum));
// Max benchmark
benchmarkOperation("Max", numbers,
stream -> stream.reduce(Integer::max));
// Count benchmark
benchmarkOperation("Count", numbers,
stream -> stream.reduce(0, (acc, el) -> acc + 1, Integer::sum));
}
public static void benchmarkComplexOperations() {
System.out.println("\n=== Complex Operations Benchmark ===");
int size = 1_000_000;
List<String> strings = new ArrayList<>(size);
Random random = new Random();
for (int i = 0; i < size; i++) {
strings.add(generateRandomString(random, 10));
}
// String concatenation benchmark
benchmarkOperation("String Concatenation", strings,
stream -> stream.reduce("", (a, b) -> a + b));
// Length sum benchmark
benchmarkOperation("Length Sum", strings,
stream -> stream.mapToInt(String::length).sum());
}
public static void analyzePerformanceFactors() {
System.out.println("\n=== Performance Factors Analysis ===");
// Analyze different data sizes
int[] sizes = {1000, 10000, 100000, 1000000};
for (int size : sizes) {
List<Integer> numbers = IntStream.range(0, size)
.boxed()
.collect(Collectors.toList());
long seqTime = measureTime(() ->
numbers.stream().reduce(0, Integer::sum));
long parTime = measureTime(() ->
numbers.parallelStream().reduce(0, Integer::sum));
System.out.printf("Size %7d: Sequential=%6dns, Parallel=%6dns, Speedup=%.2fx%n",
size, seqTime, parTime, (double)seqTime/parTime);
}
// Analyze thread pool impact
System.out.println("\nThread pool impact:");
System.out.println("Available processors: " + Runtime.getRuntime().availableProcessors());
System.out.println("Common pool parallelism: " +
ForkJoinPool.commonPool().getParallelism());
}
// Helper methods
private static <T> void benchmarkOperation(String name, List<T> data,
Function<Stream<T>, Object> operation) {
long seqTime = measureTime(() -> operation.apply(data.stream()));
long parTime = measureTime(() -> operation.apply(data.parallelStream()));
System.out.printf("%-20s: Sequential=%8dns, Parallel=%8dns, Speedup=%6.2fx%n",
name, seqTime, parTime, (double)seqTime/parTime);
}
private static long measureTime(Runnable operation) {
long start = System.nanoTime();
operation.run();
return System.nanoTime() - start;
}
private static String generateRandomString(Random random, int length) {
return random.ints('a', 'z' + 1)
.limit(length)
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
.toString();
}
}
Optimization Techniques
import java.util.*;
import java.util.stream.*;
import java.util.concurrent.atomic.*;
public class ReductionOptimization {
public static void main(String[] args) {
System.out.println("=== Reduction Optimization Techniques ===");
demonstrateChunkingOptimization();
demonstratePrimitiveOptimization();
demonstrateCollectorOptimization();
}
public static void demonstrateChunkingOptimization() {
System.out.println("\n=== Chunking Optimization ===");
List<Integer> numbers = IntStream.range(1, 1001)
.boxed()
.collect(Collectors.toList());
// Naive approach - fine for small datasets
long naiveTime = measureTime(() ->
numbers.parallelStream()
.reduce(0, Integer::sum));
// Chunked approach - better for large datasets
long chunkedTime = measureTime(() -> {
int chunkSize = 100;
int chunks = (numbers.size() + chunkSize - 1) / chunkSize;
return IntStream.range(0, chunks)
.parallel()
.map(i -> numbers.subList(i * chunkSize,
Math.min((i + 1) * chunkSize, numbers.size()))
.stream()
.reduce(0, Integer::sum))
.sum();
});
System.out.println("Naive approach: " + naiveTime + "ns");
System.out.println("Chunked approach: " + chunkedTime + "ns");
System.out.println("Improvement: " + (double)naiveTime/chunkedTime + "x");
}
public static void demonstratePrimitiveOptimization() {
System.out.println("\n=== Primitive Stream Optimization ===");
int size = 10_000_000;
// Object stream
List<Integer> objectList = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
objectList.add(i);
}
// Primitive array
int[] primitiveArray = new int[size];
for (int i = 0; i < size; i++) {
primitiveArray[i] = i;
}
long objectTime = measureTime(() ->
objectList.parallelStream()
.reduce(0, Integer::sum));
long primitiveTime = measureTime(() ->
Arrays.stream(primitiveArray).parallel().sum());
System.out.println("Object stream reduction: " + objectTime + "ns");
System.out.println("Primitive stream reduction: " + primitiveTime + "ns");
System.out.println("Primitive improvement: " + (double)objectTime/primitiveTime + "x");
}
public static void demonstrateCollectorOptimization() {
System.out.println("\n=== Collector vs Reduce Optimization ===");
List<String> words = IntStream.range(0, 100000)
.mapToObj(i -> "word" + i)
.collect(Collectors.toList());
// Using reduce (creates many intermediate strings)
long reduceTime = measureTime(() ->
words.parallelStream()
.reduce("", (a, b) -> a + " " + b));
// Using collector (more efficient for mutable reduction)
long collectTime = measureTime(() ->
words.parallelStream()
.collect(StringBuilder::new,
StringBuilder::append,
StringBuilder::append)
.toString());
System.out.println("Reduce time: " + reduceTime + "ns");
System.out.println("Collect time: " + collectTime + "ns");
System.out.println("Collect improvement: " + (double)reduceTime/collectTime + "x");
}
private static long measureTime(Runnable operation) {
// Warm up
for (int i = 0; i < 10; i++) {
operation.run();
}
// Measure
long start = System.nanoTime();
for (int i = 0; i < 10; i++) {
operation.run();
}
return (System.nanoTime() - start) / 10;
}
}
Common Patterns
Real-World Reduction Patterns
import java.util.*;
import java.util.stream.*;
import java.time.*;
import java.time.temporal.ChronoUnit;
public class CommonReductionPatterns {
public static void main(String[] args) {
System.out.println("=== Common Reduction Patterns ===");
financialAnalysisPattern();
dataAggregationPattern();
statisticalAnalysisPattern();
textProcessingPattern();
}
public static void financialAnalysisPattern() {
System.out.println("\n=== Financial Analysis Pattern ===");
class Transaction {
double amount;
String category;
LocalDate date;
Transaction(double amount, String category, LocalDate date) {
this.amount = amount;
this.category = category;
this.date = date;
}
}
List<Transaction> transactions = Arrays.asList(
new Transaction(100.0, "Food", LocalDate.now().minusDays(1)),
new Transaction(50.0, "Transport", LocalDate.now().minusDays(1)),
new Transaction(200.0, "Shopping", LocalDate.now().minusDays(2)),
new Transaction(75.0, "Food", LocalDate.now().minusDays(3)),
new Transaction(150.0, "Entertainment", LocalDate.now().minusDays(3))
);
// Pattern: Category-wise sum using reduction
Map<String, Double> categorySums = transactions.parallelStream()
.reduce(new HashMap<>(),
(map, transaction) -> {
map.merge(transaction.category, transaction.amount, Double::sum);
return map;
},
(map1, map2) -> {
map2.forEach((category, amount) ->
map1.merge(category, amount, Double::sum));
return map1;
});
System.out.println("Category sums: " + categorySums);
// Pattern: Daily totals
Map<LocalDate, Double> dailyTotals = transactions.parallelStream()
.reduce(new HashMap<>(),
(map, transaction) -> {
map.merge(transaction.date, transaction.amount, Double::sum);
return map;
},
(map1, map2) -> {
map2.forEach((date, amount) ->
map1.merge(date, amount, Double::sum));
return map1;
});
System.out.println("Daily totals: " + dailyTotals);
}
public static void dataAggregationPattern() {
System.out.println("\n=== Data Aggregation Pattern ===");
class SensorReading {
String sensorId;
double value;
Instant timestamp;
SensorReading(String sensorId, double value, Instant timestamp) {
this.sensorId = sensorId;
this.value = value;
this.timestamp = timestamp;
}
}
List<SensorReading> readings = Arrays.asList(
new SensorReading("S1", 25.5, Instant.now()),
new SensorReading("S1", 26.0, Instant.now().plusSeconds(10)),
new SensorReading("S2", 30.2, Instant.now()),
new SensorReading("S2", 29.8, Instant.now().plusSeconds(10)),
new SensorReading("S1", 24.9, Instant.now().plusSeconds(20))
);
// Pattern: Sensor statistics aggregation
class SensorStats {
double sum = 0;
double min = Double.MAX_VALUE;
double max = Double.MIN_VALUE;
int count = 0;
SensorStats accumulate(SensorReading reading) {
this.sum += reading.value;
this.min = Math.min(this.min, reading.value);
this.max = Math.max(this.max, reading.value);
this.count++;
return this;
}
SensorStats combine(SensorStats other) {
SensorStats result = new SensorStats();
result.sum = this.sum + other.sum;
result.min = Math.min(this.min, other.min);
result.max = Math.max(this.max, other.max);
result.count = this.count + other.count;
return result;
}
double getAverage() { return count > 0 ? sum / count : 0; }
@Override
public String toString() {
return String.format("avg=%.2f, min=%.2f, max=%.2f, count=%d",
getAverage(), min, max, count);
}
}
Map<String, SensorStats> sensorStats = readings.parallelStream()
.reduce(new HashMap<>(),
(map, reading) -> {
map.compute(reading.sensorId, (k, v) -> {
if (v == null) v = new SensorStats();
return v.accumulate(reading);
});
return map;
},
(map1, map2) -> {
map2.forEach((sensorId, stats) ->
map1.merge(sensorId, stats, SensorStats::combine));
return map1;
});
System.out.println("Sensor statistics: " + sensorStats);
}
public static void statisticalAnalysisPattern() {
System.out.println("\n=== Statistical Analysis Pattern ===");
List<Double> data = Arrays.asList(1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5, 10.5);
// Pattern: Complete statistical summary in one reduction
class CompleteStats {
double sum = 0;
double sumSquares = 0;
double min = Double.MAX_VALUE;
double max = Double.MIN_VALUE;
long count = 0;
CompleteStats accumulate(double value) {
this.sum += value;
this.sumSquares += value * value;
this.min = Math.min(this.min, value);
this.max = Math.max(this.max, value);
this.count++;
return this;
}
CompleteStats combine(CompleteStats other) {
CompleteStats result = new CompleteStats();
result.sum = this.sum + other.sum;
result.sumSquares = this.sumSquares + other.sumSquares;
result.min = Math.min(this.min, other.min);
result.max = Math.max(this.max, other.max);
result.count = this.count + other.count;
return result;
}
double getMean() { return count > 0 ? sum / count : 0; }
double getVariance() {
return count > 0 ? (sumSquares / count) - Math.pow(getMean(), 2) : 0;
}
double getStdDev() { return Math.sqrt(getVariance()); }
@Override
public String toString() {
return String.format(
"Stats{n=%d, mean=%.2f, stdDev=%.2f, min=%.2f, max=%.2f}",
count, getMean(), getStdDev(), min, max);
}
}
CompleteStats stats = data.parallelStream()
.reduce(new CompleteStats(),
CompleteStats::accumulate,
CompleteStats::combine);
System.out.println("Complete statistics: " + stats);
}
public static void textProcessingPattern() {
System.out.println("\n=== Text Processing Pattern ===");
List<String> documents = Arrays.asList(
"The quick brown fox jumps over the lazy dog",
"Java streams provide functional operations",
"Parallel reduction enables efficient processing",
"The fox and the dog are both animals"
);
// Pattern: Word frequency analysis
Map<String, Integer> wordFrequency = documents.parallelStream()
.flatMap(doc -> Arrays.stream(doc.toLowerCase().split("\\W+")))
.filter(word -> !word.isEmpty())
.reduce(new HashMap<>(),
(map, word) -> {
map.merge(word, 1, Integer::sum);
return map;
},
(map1, map2) -> {
map2.forEach((word, count) ->
map1.merge(word, count, Integer::sum));
return map1;
});
// Show top 5 most frequent words
System.out.println("Top 5 frequent words:");
wordFrequency.entrySet().stream()
.sorted(Map.Entry.<String, Integer>comparingByValue().reversed())
.limit(5)
.forEach(entry ->
System.out.println(" " + entry.getKey() + ": " + entry.getValue()));
}
}
Summary
Key Points for Parallel Stream Reduction:
- Associativity is Critical: The reduction operation must be associative for correct parallel results
- Identity Element: Must satisfy
identity op a = afor all elements - Combiner Function: Required for parallel streams to combine partial results
- Performance: Parallel reduction shines with large datasets and expensive operations
- Primitive Streams: Use for better performance with primitive data
When to Use Parallel Reduction:
- Large datasets (> 1000 elements)
- CPU-intensive operations
- Associative operations available
- Sufficient available processors
Common Pitfalls:
- Non-associative operations (subtraction, division)
- Wrong identity elements
- Stateful operations in reducers
- Overhead for small datasets
Parallel stream reduction is a powerful tool for leveraging multi-core processors, but requires careful consideration of operation properties and performance characteristics.