Stream Reduction in Java Parallel Streams: Complete Guide

Table of Contents

  1. Introduction to Reduction
  2. Basic Reduction Operations
  3. Parallel Reduction Fundamentals
  4. Associativity Requirement
  5. Identity and Combiner
  6. Custom Reducers
  7. Performance Considerations
  8. Common Patterns

Introduction to Reduction

Reduction is the process of combining all elements of a stream into a single result. In parallel streams, reduction becomes more complex as elements are processed concurrently and results need to be combined.

Sequential vs Parallel Reduction

import java.util.*;
import java.util.stream.*;
public class ReductionIntroduction {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
System.out.println("=== Sequential vs Parallel Reduction ===");
// Sequential reduction
long seqStart = System.nanoTime();
int sequentialSum = numbers.stream()
.reduce(0, (a, b) -> a + b);
long seqTime = System.nanoTime() - seqStart;
// Parallel reduction
long parStart = System.nanoTime();
int parallelSum = numbers.parallelStream()
.reduce(0, (a, b) -> a + b);
long parTime = System.nanoTime() - parStart;
System.out.println("Sequential sum: " + sequentialSum + " in " + seqTime + "ns");
System.out.println("Parallel sum: " + parallelSum + " in " + parTime + "ns");
System.out.println("Results equal: " + (sequentialSum == parallelSum));
demonstrateReductionProcess();
}
public static void demonstrateReductionProcess() {
System.out.println("\n=== Reduction Process ===");
List<String> words = Arrays.asList("Hello", "World", "Stream", "API");
// String concatenation reduction
String concatenated = words.stream()
.reduce("", (partial, element) -> {
String result = partial + " " + element;
System.out.println("Combining: '" + partial + "' + '" + element + "' = '" + result + "'");
return result;
});
System.out.println("Final result: " + concatenated.trim());
// Parallel version shows different combination order
System.out.println("\nParallel reduction (order may vary):");
String parallelConcatenated = words.parallelStream()
.reduce("", (partial, element) -> {
String result = partial + " " + element;
System.out.println("Combining: '" + partial + "' + '" + element + "' = '" + result + "'");
return result;
});
System.out.println("Final result: " + parallelConcatenated.trim());
}
}

Basic Reduction Operations

Standard Reduction Operations

import java.util.*;
import java.util.stream.*;
public class BasicReductionOperations {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<String> words = Arrays.asList("apple", "banana", "cherry", "date");
System.out.println("=== Basic Reduction Operations ===");
// 1. Sum reduction
int sum = numbers.stream()
.reduce(0, Integer::sum);
System.out.println("Sum: " + sum);
// 2. Product reduction
int product = numbers.stream()
.reduce(1, (a, b) -> a * b);
System.out.println("Product: " + product);
// 3. Max reduction
Optional<Integer> max = numbers.stream()
.reduce(Integer::max);
System.out.println("Max: " + max.orElse(-1));
// 4. Min reduction
Optional<Integer> min = numbers.stream()
.reduce(Integer::min);
System.out.println("Min: " + min.orElse(-1));
// 5. String concatenation
String concatenated = words.stream()
.reduce("", (a, b) -> a + " " + b);
System.out.println("Concatenated: " + concatenated.trim());
// 6. Count using reduction
long count = numbers.stream()
.reduce(0, (acc, element) -> acc + 1, Integer::sum);
System.out.println("Count via reduction: " + count);
demonstrateThreeArgumentReduce();
}
public static void demonstrateThreeArgumentReduce() {
System.out.println("\n=== Three-Argument Reduce ===");
List<String> words = Arrays.asList("Hello", "World", "Stream", "API");
// Three-argument reduce: identity, accumulator, combiner
String result = words.parallelStream()
.reduce("",
(partial, element) -> {
System.out.println(Thread.currentThread().getName() + 
" - Accumulator: " + partial + " + " + element);
return partial + " " + element;
},
(part1, part2) -> {
System.out.println(Thread.currentThread().getName() + 
" - Combiner: " + part1 + " + " + part2);
return part1 + part2;
});
System.out.println("Final: " + result.trim());
}
}

Reduction with Primitive Streams

import java.util.*;
import java.util.stream.*;
public class PrimitiveStreamReduction {
public static void main(String[] args) {
int[] numbers = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
System.out.println("=== Primitive Stream Reduction ===");
// IntStream reductions
int sum = IntStream.of(numbers).sum();
OptionalDouble average = IntStream.of(numbers).average();
OptionalInt max = IntStream.of(numbers).max();
OptionalInt min = IntStream.of(numbers).min();
System.out.println("Sum: " + sum);
System.out.println("Average: " + average.orElse(0));
System.out.println("Max: " + max.orElse(-1));
System.out.println("Min: " + min.orElse(-1));
// Custom reduction with primitive streams
long product = IntStream.range(1, 6)
.asLongStream()
.reduce(1, (a, b) -> a * b);
System.out.println("Product 1-5: " + product);
// Statistical summary
IntSummaryStatistics stats = IntStream.of(numbers)
.summaryStatistics();
System.out.println("\nStatistics: " + stats);
demonstrateParallelPrimitiveReduction();
}
public static void demonstrateParallelPrimitiveReduction() {
System.out.println("\n=== Parallel Primitive Reduction ===");
IntStream numbers = IntStream.range(1, 1001);
long seqStart = System.nanoTime();
int sequentialSum = numbers.sum();
long seqTime = System.nanoTime() - seqStart;
long parStart = System.nanoTime();
int parallelSum = numbers.parallel().sum();
long parTime = System.nanoTime() - parStart;
System.out.println("Sequential sum: " + sequentialSum + " in " + seqTime + "ns");
System.out.println("Parallel sum: " + parallelSum + " in " + parTime + "ns");
System.out.println("Speedup: " + (double)seqTime/parTime + "x");
}
}

Parallel Reduction Fundamentals

How Parallel Reduction Works

import java.util.*;
import java.util.stream.*;
import java.util.concurrent.*;
public class ParallelReductionMechanics {
public static void main(String[] args) {
System.out.println("=== Parallel Reduction Mechanics ===");
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// Visualize parallel reduction process
int result = numbers.parallelStream()
.reduce(0,
(identity, element) -> {
String thread = Thread.currentThread().getName();
int sum = identity + element;
System.out.println(thread + " - Accumulate: " + identity + " + " + element + " = " + sum);
return sum;
},
(partial1, partial2) -> {
String thread = Thread.currentThread().getName();
int combined = partial1 + partial2;
System.out.println(thread + " - Combine: " + partial1 + " + " + partial2 + " = " + combined);
return combined;
});
System.out.println("Final result: " + result);
demonstrateForkJoinBehavior();
}
public static void demonstrateForkJoinBehavior() {
System.out.println("\n=== ForkJoin Pool Behavior ===");
// Create a larger dataset to see proper parallelization
List<Integer> largeList = IntStream.range(1, 101)
.boxed()
.collect(Collectors.toList());
System.out.println("ForkJoinPool parallelism: " + 
ForkJoinPool.commonPool().getParallelism());
Map<String, Integer> threadUsage = new ConcurrentHashMap<>();
int sum = largeList.parallelStream()
.reduce(0,
(acc, num) -> {
String thread = Thread.currentThread().getName();
threadUsage.merge(thread, 1, Integer::sum);
return acc + num;
},
(a, b) -> a + b);
System.out.println("Thread usage count:");
threadUsage.forEach((thread, count) -> 
System.out.println("  " + thread + ": " + count + " operations"));
System.out.println("Total sum: " + sum);
System.out.println("Expected sum: " + (100 * 101 / 2));
}
}

Reduction with Complex Objects

import java.util.*;
import java.util.stream.*;
class Transaction {
private String id;
private double amount;
private String currency;
public Transaction(String id, double amount, String currency) {
this.id = id;
this.amount = amount;
this.currency = currency;
}
public double getAmount() { return amount; }
public String getCurrency() { return currency; }
public String getId() { return id; }
@Override
public String toString() {
return String.format("Transaction{id='%s', amount=%.2f, currency='%s'}", 
id, amount, currency);
}
}
class FinancialSummary {
private double totalAmount;
private int transactionCount;
private double maxAmount;
private double minAmount;
public FinancialSummary() {
this.totalAmount = 0;
this.transactionCount = 0;
this.maxAmount = Double.MIN_VALUE;
this.minAmount = Double.MAX_VALUE;
}
public FinancialSummary(double total, int count, double max, double min) {
this.totalAmount = total;
this.transactionCount = count;
this.maxAmount = max;
this.minAmount = min;
}
// Combine two summaries
public FinancialSummary combine(FinancialSummary other) {
return new FinancialSummary(
this.totalAmount + other.totalAmount,
this.transactionCount + other.transactionCount,
Math.max(this.maxAmount, other.maxAmount),
Math.min(this.minAmount, other.minAmount)
);
}
// Accumulate a transaction
public FinancialSummary accumulate(Transaction transaction) {
return new FinancialSummary(
this.totalAmount + transaction.getAmount(),
this.transactionCount + 1,
Math.max(this.maxAmount, transaction.getAmount()),
Math.min(this.minAmount, transaction.getAmount())
);
}
@Override
public String toString() {
return String.format(
"Summary{total=%.2f, count=%d, max=%.2f, min=%.2f, avg=%.2f}",
totalAmount, transactionCount, maxAmount, minAmount, 
transactionCount > 0 ? totalAmount / transactionCount : 0
);
}
}
public class ComplexObjectReduction {
public static void main(String[] args) {
System.out.println("=== Complex Object Reduction ===");
List<Transaction> transactions = Arrays.asList(
new Transaction("T1", 100.0, "USD"),
new Transaction("T2", 250.5, "USD"),
new Transaction("T3", 75.25, "USD"),
new Transaction("T4", 300.0, "USD"),
new Transaction("T5", 50.0, "USD"),
new Transaction("T6", 425.75, "USD")
);
// Sequential reduction with custom object
FinancialSummary sequentialSummary = transactions.stream()
.reduce(
new FinancialSummary(),
FinancialSummary::accumulate,
FinancialSummary::combine
);
System.out.println("Sequential: " + sequentialSummary);
// Parallel reduction
FinancialSummary parallelSummary = transactions.parallelStream()
.reduce(
new FinancialSummary(),
FinancialSummary::accumulate,
FinancialSummary::combine
);
System.out.println("Parallel: " + parallelSummary);
demonstrateMutableReduction();
}
public static void demonstrateMutableReduction() {
System.out.println("\n=== Mutable Reduction ===");
List<Transaction> transactions = Arrays.asList(
new Transaction("T1", 100.0, "USD"),
new Transaction("T2", 250.5, "USD"),
new Transaction("T3", 75.25, "USD")
);
// Using collect() for mutable reduction (often better than reduce())
FinancialSummary collectedSummary = transactions.parallelStream()
.collect(
FinancialSummary::new,
(summary, transaction) -> {
// Accumulator (mutable)
summary.totalAmount += transaction.getAmount();
summary.transactionCount++;
summary.maxAmount = Math.max(summary.maxAmount, transaction.getAmount());
summary.minAmount = Math.min(summary.minAmount, transaction.getAmount());
},
(summary1, summary2) -> {
// Combiner (mutable)
summary1.totalAmount += summary2.totalAmount;
summary1.transactionCount += summary2.transactionCount;
summary1.maxAmount = Math.max(summary1.maxAmount, summary2.maxAmount);
summary1.minAmount = Math.min(summary1.minAmount, summary2.minAmount);
}
);
System.out.println("Collected: " + collectedSummary);
}
}

Associativity Requirement

Understanding Associativity

import java.util.*;
import java.util.stream.*;
public class AssociativityRequirement {
public static void main(String[] args) {
System.out.println("=== Associativity Requirement ===");
// Associative operation: (a + b) + c = a + (b + c)
demonstrateAssociativeOperations();
demonstrateNonAssociativeOperations();
}
public static void demonstrateAssociativeOperations() {
System.out.println("\n=== Associative Operations ===");
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
// Addition is associative
int sumSequential = numbers.stream()
.reduce(0, (a, b) -> a + b);
int sumParallel = numbers.parallelStream()
.reduce(0, (a, b) -> a + b);
System.out.println("Addition - Sequential: " + sumSequential + ", Parallel: " + sumParallel);
// Multiplication is associative
int productSequential = numbers.stream()
.reduce(1, (a, b) -> a * b);
int productParallel = numbers.parallelStream()
.reduce(1, (a, b) -> a * b);
System.out.println("Multiplication - Sequential: " + productSequential + ", Parallel: " + productParallel);
// Max is associative
Optional<Integer> maxSequential = numbers.stream()
.reduce(Integer::max);
Optional<Integer> maxParallel = numbers.parallelStream()
.reduce(Integer::max);
System.out.println("Max - Sequential: " + maxSequential + ", Parallel: " + parallel);
}
public static void demonstrateNonAssociativeOperations() {
System.out.println("\n=== Non-Associative Operations ===");
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
// Subtraction is NOT associative: (a - b) - c ≠ a - (b - c)
System.out.println("Subtraction example:");
System.out.println("(1 - 2) - 3 = " + ((1 - 2) - 3));
System.out.println("1 - (2 - 3) = " + (1 - (2 - 3)));
// Using non-associative operation in parallel stream gives wrong results
int subtractSequential = numbers.stream()
.reduce(0, (a, b) -> a - b);
int subtractParallel = numbers.parallelStream()
.reduce(0, (a, b) -> a - b);
System.out.println("Subtraction - Sequential: " + subtractSequential + ", Parallel: " + subtractParallel);
System.out.println("WARNING: Parallel result is incorrect due to non-associativity!");
// Division is also NOT associative
System.out.println("\nDivision example:");
System.out.println("(8 / 4) / 2 = " + ((8 / 4) / 2));
System.out.println("8 / (4 / 2) = " + (8 / (4 / 2)));
}
public static void verifyAssociativity() {
System.out.println("\n=== Verifying Associativity ===");
// Test if an operation is associative
BiFunction<Integer, Integer, Integer> operation = (a, b) -> a + b; // Change to test other operations
int a = 5, b = 3, c = 2;
int leftAssociative = operation.apply(operation.apply(a, b), c);
int rightAssociative = operation.apply(a, operation.apply(b, c));
System.out.println("Operation: " + operation);
System.out.println("(" + a + " op " + b + ") op " + c + " = " + leftAssociative);
System.out.println(a + " op (" + b + " op " + c + ") = " + rightAssociative);
System.out.println("Is associative: " + (leftAssociative == rightAssociative));
}
}

Associativity in Custom Operations

import java.util.*;
import java.util.function.*;
import java.util.stream.*;
public class CustomOperationAssociativity {
// Custom operation: weighted average accumulator
static class WeightedAverage {
double sum;
double weight;
WeightedAverage() {
this(0, 0);
}
WeightedAverage(double sum, double weight) {
this.sum = sum;
this.weight = weight;
}
// This operation MUST be associative for parallel reduction
WeightedAverage combine(WeightedAverage other) {
return new WeightedAverage(
this.sum + other.sum,
this.weight + other.weight
);
}
double getAverage() {
return weight == 0 ? 0 : sum / weight;
}
}
public static void main(String[] args) {
System.out.println("=== Custom Operation Associativity ===");
// Test data: (value, weight) pairs
List<double[]> weightedValues = Arrays.asList(
new double[]{10, 1},  // value 10, weight 1
new double[]{20, 2},  // value 20, weight 2  
new double[]{30, 3},  // value 30, weight 3
new double[]{40, 4}   // value 40, weight 4
);
// Weighted average calculation
WeightedAverage sequentialResult = weightedValues.stream()
.reduce(
new WeightedAverage(),
(acc, pair) -> new WeightedAverage(
acc.sum + pair[0] * pair[1], 
acc.weight + pair[1]
),
WeightedAverage::combine
);
WeightedAverage parallelResult = weightedValues.parallelStream()
.reduce(
new WeightedAverage(),
(acc, pair) -> new WeightedAverage(
acc.sum + pair[0] * pair[1],
acc.weight + pair[1]
),
WeightedAverage::combine
);
System.out.println("Sequential weighted average: " + sequentialResult.getAverage());
System.out.println("Parallel weighted average: " + parallelResult.getAverage());
System.out.println("Results equal: " + 
(sequentialResult.getAverage() == parallelResult.getAverage()));
verifyCustomAssociativity();
}
public static void verifyCustomAssociativity() {
System.out.println("\n=== Verifying Custom Operation Associativity ===");
// Test if our weighted average operation is associative
WeightedAverage a = new WeightedAverage(10 * 1, 1); // value 10, weight 1
WeightedAverage b = new WeightedAverage(20 * 2, 2); // value 20, weight 2
WeightedAverage c = new WeightedAverage(30 * 3, 3); // value 30, weight 3
WeightedAverage left = a.combine(b).combine(c);
WeightedAverage right = a.combine(b.combine(c));
System.out.println("(A combine B) combine C: " + left.getAverage());
System.out.println("A combine (B combine C): " + right.getAverage());
System.out.println("Operation is associative: " + 
(left.getAverage() == right.getAverage()));
}
}

Identity and Combiner

Identity Element Requirements

import java.util.*;
import java.util.stream.*;
public class IdentityElement {
public static void main(String[] args) {
System.out.println("=== Identity Element Requirements ===");
demonstrateIdentityProperties();
demonstrateIdentityIssues();
}
public static void demonstrateIdentityProperties() {
System.out.println("\n=== Proper Identity Elements ===");
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
// Addition: identity is 0 (a + 0 = a)
int sumIdentity = numbers.parallelStream()
.reduce(0, (a, b) -> a + b);
System.out.println("Sum with identity 0: " + sumIdentity);
// Multiplication: identity is 1 (a * 1 = a)
int productIdentity = numbers.parallelStream()
.reduce(1, (a, b) -> a * b);
System.out.println("Product with identity 1: " + productIdentity);
// String concatenation: identity is "" (s + "" = s)
List<String> words = Arrays.asList("Hello", "World");
String concatIdentity = words.parallelStream()
.reduce("", (a, b) -> a + " " + b);
System.out.println("Concatenation with identity '': " + concatIdentity.trim());
// Max operation: no proper identity, use Optional
Optional<Integer> maxNoIdentity = numbers.parallelStream()
.reduce(Integer::max);
System.out.println("Max without identity: " + maxNoIdentity.orElse(-1));
}
public static void demonstrateIdentityIssues() {
System.out.println("\n=== Identity Element Issues ===");
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
// Wrong identity for sum (should be 0, not 10)
int wrongIdentitySum = numbers.parallelStream()
.reduce(10, (a, b) -> a + b);
System.out.println("Sum with wrong identity 10: " + wrongIdentitySum);
System.out.println("This is incorrect because 10 is added multiple times!");
// Demonstrating why wrong identity causes problems
System.out.println("\nWhy wrong identity fails:");
System.out.println("With identity 10 and numbers [1,2,3]:");
System.out.println("Correct: 1 + 2 + 3 = 6");
System.out.println("With wrong identity: 10 + 1 + 10 + 2 + 10 + 3 = 36");
// Empty stream with identity
List<Integer> emptyList = Arrays.asList();
int emptySum = emptyList.stream()
.reduce(0, (a, b) -> a + b);
System.out.println("Empty stream with identity 0: " + emptySum);
}
}

Combiner Function in Parallel Reduction

import java.util.*;
import java.util.stream.*;
import java.util.concurrent.atomic.*;
public class CombinerFunction {
public static void main(String[] args) {
System.out.println("=== Combiner Function ===");
demonstrateCombinerBasics();
demonstrateCombinerInAction();
}
public static void demonstrateCombinerBasics() {
System.out.println("\n=== Combiner Basics ===");
List<String> words = Arrays.asList("Hello", "World", "Java", "Streams");
// Three-argument reduce with explicit combiner
String result = words.parallelStream()
.reduce("",
// Accumulator: combines identity with element
(identity, element) -> {
System.out.println(Thread.currentThread().getName() + 
" - Accumulator: '" + identity + "' + '" + element + "'");
return identity + " " + element;
},
// Combiner: combines two partial results
(partial1, partial2) -> {
System.out.println(Thread.currentThread().getName() + 
" - Combiner: '" + partial1 + "' + '" + partial2 + "'");
return partial1 + partial2;
});
System.out.println("Final result: " + result.trim());
}
public static void demonstrateCombinerInAction() {
System.out.println("\n=== Combiner in Action ===");
// Count elements using reduce with combiner
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
AtomicInteger accumulatorCalls = new AtomicInteger();
AtomicInteger combinerCalls = new AtomicInteger();
long count = numbers.parallelStream()
.reduce(0L,
// Accumulator: count one element
(acc, element) -> {
accumulatorCalls.incrementAndGet();
return acc + 1;
},
// Combiner: sum two counts
(count1, count2) -> {
combinerCalls.incrementAndGet();
return count1 + count2;
});
System.out.println("Count: " + count);
System.out.println("Accumulator calls: " + accumulatorCalls.get());
System.out.println("Combiner calls: " + combinerCalls.get());
System.out.println("Total operations: " + (accumulatorCalls.get() + combinerCalls.get()));
// Without combiner (two-argument reduce) - may fail in parallel
try {
Long badCount = numbers.parallelStream()
.reduce(0L, (acc, element) -> acc + 1);
System.out.println("Bad count (may be wrong): " + badCount);
} catch (Exception e) {
System.out.println("Two-argument reduce failed in parallel: " + e.getMessage());
}
}
}

Custom Reducers

Building Custom Reduction Operations

import java.util.*;
import java.util.stream.*;
import java.util.function.*;
public class CustomReducers {
// Custom reducer for statistical summary
static class StatsReducer implements 
Function<Double, StatsReducer>,
BinaryOperator<StatsReducer> {
private double sum = 0;
private double min = Double.MAX_VALUE;
private double max = Double.MIN_VALUE;
private long count = 0;
// Function: accumulate a value
@Override
public StatsReducer apply(Double value) {
this.sum += value;
this.min = Math.min(this.min, value);
this.max = Math.max(this.max, value);
this.count++;
return this;
}
// BinaryOperator: combine two reducers
@Override
public StatsReducer apply(StatsReducer s1, StatsReducer s2) {
StatsReducer result = new StatsReducer();
result.sum = s1.sum + s2.sum;
result.min = Math.min(s1.min, s2.min);
result.max = Math.max(s1.max, s2.max);
result.count = s1.count + s2.count;
return result;
}
public double getAverage() {
return count == 0 ? 0 : sum / count;
}
@Override
public String toString() {
return String.format("Stats{count=%d, sum=%.2f, min=%.2f, max=%.2f, avg=%.2f}",
count, sum, min, max, getAverage());
}
}
// Generic custom reducer builder
static class CustomReducer<T, R> {
private final Supplier<R> supplier;
private final BiFunction<R, T, R> accumulator;
private final BinaryOperator<R> combiner;
public CustomReducer(Supplier<R> supplier, 
BiFunction<R, T, R> accumulator,
BinaryOperator<R> combiner) {
this.supplier = supplier;
this.accumulator = accumulator;
this.combiner = combiner;
}
public R reduce(Stream<T> stream) {
return stream.reduce(
supplier.get(),
accumulator,
combiner
);
}
}
public static void main(String[] args) {
System.out.println("=== Custom Reducers ===");
List<Double> numbers = Arrays.asList(1.5, 2.5, 3.5, 4.5, 5.5);
// Using custom StatsReducer
StatsReducer stats = numbers.parallelStream()
.reduce(
new StatsReducer(),
StatsReducer::apply,
StatsReducer::apply
);
System.out.println("Custom stats: " + stats);
// Using generic custom reducer
CustomReducer<Double, double[]> varianceReducer = new CustomReducer<>(
() -> new double[3], // [sum, sumOfSquares, count]
(acc, value) -> {
acc[0] += value;          // sum
acc[1] += value * value;  // sum of squares
acc[2]++;                 // count
return acc;
},
(acc1, acc2) -> {
acc1[0] += acc2[0];
acc1[1] += acc2[1];
acc1[2] += acc2[2];
return acc1;
}
);
double[] varianceResult = varianceReducer.reduce(numbers.parallelStream());
double mean = varianceResult[0] / varianceResult[2];
double variance = (varianceResult[1] / varianceResult[2]) - (mean * mean);
System.out.println("Variance: " + variance);
System.out.println("Standard deviation: " + Math.sqrt(variance));
demonstrateComplexCustomReducer();
}
public static void demonstrateComplexCustomReducer() {
System.out.println("\n=== Complex Custom Reducer ===");
List<String> sentences = Arrays.asList(
"Hello world",
"Java streams are powerful",
"Parallel reduction works well",
"Functional programming is interesting"
);
// Reducer for text statistics
class TextStats {
int totalWords = 0;
int totalChars = 0;
int totalSentences = 0;
String longestWord = "";
TextStats accumulate(String sentence) {
String[] words = sentence.split(" ");
this.totalWords += words.length;
this.totalChars += sentence.replace(" ", "").length();
this.totalSentences++;
// Find longest word in this sentence
String sentenceLongest = Arrays.stream(words)
.max(Comparator.comparing(String::length))
.orElse("");
if (sentenceLongest.length() > this.longestWord.length()) {
this.longestWord = sentenceLongest;
}
return this;
}
TextStats combine(TextStats other) {
TextStats result = new TextStats();
result.totalWords = this.totalWords + other.totalWords;
result.totalChars = this.totalChars + other.totalChars;
result.totalSentences = this.totalSentences + other.totalSentences;
result.longestWord = this.longestWord.length() > other.longestWord.length() 
? this.longestWord : other.longestWord;
return result;
}
@Override
public String toString() {
return String.format(
"TextStats{sentences=%d, words=%d, chars=%d, avgWordLen=%.2f, longest='%s'}",
totalSentences, totalWords, totalChars,
totalWords > 0 ? (double)totalChars / totalWords : 0,
longestWord
);
}
}
TextStats textStats = sentences.parallelStream()
.reduce(
new TextStats(),
TextStats::accumulate,
TextStats::combine
);
System.out.println("Text statistics: " + textStats);
}
}

Performance Considerations

Parallel Reduction Performance

import java.util.*;
import java.util.stream.*;
import java.util.concurrent.*;
public class ParallelReductionPerformance {
public static void main(String[] args) {
System.out.println("=== Parallel Reduction Performance ===");
benchmarkSimpleOperations();
benchmarkComplexOperations();
analyzePerformanceFactors();
}
public static void benchmarkSimpleOperations() {
System.out.println("\n=== Simple Operations Benchmark ===");
int size = 10_000_000;
List<Integer> numbers = new ArrayList<>(size);
Random random = new Random();
for (int i = 0; i < size; i++) {
numbers.add(random.nextInt(1000));
}
// Sum benchmark
benchmarkOperation("Sum", numbers, 
stream -> stream.reduce(0, Integer::sum));
// Max benchmark
benchmarkOperation("Max", numbers,
stream -> stream.reduce(Integer::max));
// Count benchmark
benchmarkOperation("Count", numbers,
stream -> stream.reduce(0, (acc, el) -> acc + 1, Integer::sum));
}
public static void benchmarkComplexOperations() {
System.out.println("\n=== Complex Operations Benchmark ===");
int size = 1_000_000;
List<String> strings = new ArrayList<>(size);
Random random = new Random();
for (int i = 0; i < size; i++) {
strings.add(generateRandomString(random, 10));
}
// String concatenation benchmark
benchmarkOperation("String Concatenation", strings,
stream -> stream.reduce("", (a, b) -> a + b));
// Length sum benchmark
benchmarkOperation("Length Sum", strings,
stream -> stream.mapToInt(String::length).sum());
}
public static void analyzePerformanceFactors() {
System.out.println("\n=== Performance Factors Analysis ===");
// Analyze different data sizes
int[] sizes = {1000, 10000, 100000, 1000000};
for (int size : sizes) {
List<Integer> numbers = IntStream.range(0, size)
.boxed()
.collect(Collectors.toList());
long seqTime = measureTime(() -> 
numbers.stream().reduce(0, Integer::sum));
long parTime = measureTime(() -> 
numbers.parallelStream().reduce(0, Integer::sum));
System.out.printf("Size %7d: Sequential=%6dns, Parallel=%6dns, Speedup=%.2fx%n",
size, seqTime, parTime, (double)seqTime/parTime);
}
// Analyze thread pool impact
System.out.println("\nThread pool impact:");
System.out.println("Available processors: " + Runtime.getRuntime().availableProcessors());
System.out.println("Common pool parallelism: " + 
ForkJoinPool.commonPool().getParallelism());
}
// Helper methods
private static <T> void benchmarkOperation(String name, List<T> data, 
Function<Stream<T>, Object> operation) {
long seqTime = measureTime(() -> operation.apply(data.stream()));
long parTime = measureTime(() -> operation.apply(data.parallelStream()));
System.out.printf("%-20s: Sequential=%8dns, Parallel=%8dns, Speedup=%6.2fx%n",
name, seqTime, parTime, (double)seqTime/parTime);
}
private static long measureTime(Runnable operation) {
long start = System.nanoTime();
operation.run();
return System.nanoTime() - start;
}
private static String generateRandomString(Random random, int length) {
return random.ints('a', 'z' + 1)
.limit(length)
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
.toString();
}
}

Optimization Techniques

import java.util.*;
import java.util.stream.*;
import java.util.concurrent.atomic.*;
public class ReductionOptimization {
public static void main(String[] args) {
System.out.println("=== Reduction Optimization Techniques ===");
demonstrateChunkingOptimization();
demonstratePrimitiveOptimization();
demonstrateCollectorOptimization();
}
public static void demonstrateChunkingOptimization() {
System.out.println("\n=== Chunking Optimization ===");
List<Integer> numbers = IntStream.range(1, 1001)
.boxed()
.collect(Collectors.toList());
// Naive approach - fine for small datasets
long naiveTime = measureTime(() -> 
numbers.parallelStream()
.reduce(0, Integer::sum));
// Chunked approach - better for large datasets
long chunkedTime = measureTime(() -> {
int chunkSize = 100;
int chunks = (numbers.size() + chunkSize - 1) / chunkSize;
return IntStream.range(0, chunks)
.parallel()
.map(i -> numbers.subList(i * chunkSize, 
Math.min((i + 1) * chunkSize, numbers.size()))
.stream()
.reduce(0, Integer::sum))
.sum();
});
System.out.println("Naive approach: " + naiveTime + "ns");
System.out.println("Chunked approach: " + chunkedTime + "ns");
System.out.println("Improvement: " + (double)naiveTime/chunkedTime + "x");
}
public static void demonstratePrimitiveOptimization() {
System.out.println("\n=== Primitive Stream Optimization ===");
int size = 10_000_000;
// Object stream
List<Integer> objectList = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
objectList.add(i);
}
// Primitive array
int[] primitiveArray = new int[size];
for (int i = 0; i < size; i++) {
primitiveArray[i] = i;
}
long objectTime = measureTime(() -> 
objectList.parallelStream()
.reduce(0, Integer::sum));
long primitiveTime = measureTime(() -> 
Arrays.stream(primitiveArray).parallel().sum());
System.out.println("Object stream reduction: " + objectTime + "ns");
System.out.println("Primitive stream reduction: " + primitiveTime + "ns");
System.out.println("Primitive improvement: " + (double)objectTime/primitiveTime + "x");
}
public static void demonstrateCollectorOptimization() {
System.out.println("\n=== Collector vs Reduce Optimization ===");
List<String> words = IntStream.range(0, 100000)
.mapToObj(i -> "word" + i)
.collect(Collectors.toList());
// Using reduce (creates many intermediate strings)
long reduceTime = measureTime(() -> 
words.parallelStream()
.reduce("", (a, b) -> a + " " + b));
// Using collector (more efficient for mutable reduction)
long collectTime = measureTime(() -> 
words.parallelStream()
.collect(StringBuilder::new, 
StringBuilder::append,
StringBuilder::append)
.toString());
System.out.println("Reduce time: " + reduceTime + "ns");
System.out.println("Collect time: " + collectTime + "ns");
System.out.println("Collect improvement: " + (double)reduceTime/collectTime + "x");
}
private static long measureTime(Runnable operation) {
// Warm up
for (int i = 0; i < 10; i++) {
operation.run();
}
// Measure
long start = System.nanoTime();
for (int i = 0; i < 10; i++) {
operation.run();
}
return (System.nanoTime() - start) / 10;
}
}

Common Patterns

Real-World Reduction Patterns

import java.util.*;
import java.util.stream.*;
import java.time.*;
import java.time.temporal.ChronoUnit;
public class CommonReductionPatterns {
public static void main(String[] args) {
System.out.println("=== Common Reduction Patterns ===");
financialAnalysisPattern();
dataAggregationPattern();
statisticalAnalysisPattern();
textProcessingPattern();
}
public static void financialAnalysisPattern() {
System.out.println("\n=== Financial Analysis Pattern ===");
class Transaction {
double amount;
String category;
LocalDate date;
Transaction(double amount, String category, LocalDate date) {
this.amount = amount;
this.category = category;
this.date = date;
}
}
List<Transaction> transactions = Arrays.asList(
new Transaction(100.0, "Food", LocalDate.now().minusDays(1)),
new Transaction(50.0, "Transport", LocalDate.now().minusDays(1)),
new Transaction(200.0, "Shopping", LocalDate.now().minusDays(2)),
new Transaction(75.0, "Food", LocalDate.now().minusDays(3)),
new Transaction(150.0, "Entertainment", LocalDate.now().minusDays(3))
);
// Pattern: Category-wise sum using reduction
Map<String, Double> categorySums = transactions.parallelStream()
.reduce(new HashMap<>(),
(map, transaction) -> {
map.merge(transaction.category, transaction.amount, Double::sum);
return map;
},
(map1, map2) -> {
map2.forEach((category, amount) -> 
map1.merge(category, amount, Double::sum));
return map1;
});
System.out.println("Category sums: " + categorySums);
// Pattern: Daily totals
Map<LocalDate, Double> dailyTotals = transactions.parallelStream()
.reduce(new HashMap<>(),
(map, transaction) -> {
map.merge(transaction.date, transaction.amount, Double::sum);
return map;
},
(map1, map2) -> {
map2.forEach((date, amount) -> 
map1.merge(date, amount, Double::sum));
return map1;
});
System.out.println("Daily totals: " + dailyTotals);
}
public static void dataAggregationPattern() {
System.out.println("\n=== Data Aggregation Pattern ===");
class SensorReading {
String sensorId;
double value;
Instant timestamp;
SensorReading(String sensorId, double value, Instant timestamp) {
this.sensorId = sensorId;
this.value = value;
this.timestamp = timestamp;
}
}
List<SensorReading> readings = Arrays.asList(
new SensorReading("S1", 25.5, Instant.now()),
new SensorReading("S1", 26.0, Instant.now().plusSeconds(10)),
new SensorReading("S2", 30.2, Instant.now()),
new SensorReading("S2", 29.8, Instant.now().plusSeconds(10)),
new SensorReading("S1", 24.9, Instant.now().plusSeconds(20))
);
// Pattern: Sensor statistics aggregation
class SensorStats {
double sum = 0;
double min = Double.MAX_VALUE;
double max = Double.MIN_VALUE;
int count = 0;
SensorStats accumulate(SensorReading reading) {
this.sum += reading.value;
this.min = Math.min(this.min, reading.value);
this.max = Math.max(this.max, reading.value);
this.count++;
return this;
}
SensorStats combine(SensorStats other) {
SensorStats result = new SensorStats();
result.sum = this.sum + other.sum;
result.min = Math.min(this.min, other.min);
result.max = Math.max(this.max, other.max);
result.count = this.count + other.count;
return result;
}
double getAverage() { return count > 0 ? sum / count : 0; }
@Override
public String toString() {
return String.format("avg=%.2f, min=%.2f, max=%.2f, count=%d",
getAverage(), min, max, count);
}
}
Map<String, SensorStats> sensorStats = readings.parallelStream()
.reduce(new HashMap<>(),
(map, reading) -> {
map.compute(reading.sensorId, (k, v) -> {
if (v == null) v = new SensorStats();
return v.accumulate(reading);
});
return map;
},
(map1, map2) -> {
map2.forEach((sensorId, stats) -> 
map1.merge(sensorId, stats, SensorStats::combine));
return map1;
});
System.out.println("Sensor statistics: " + sensorStats);
}
public static void statisticalAnalysisPattern() {
System.out.println("\n=== Statistical Analysis Pattern ===");
List<Double> data = Arrays.asList(1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5, 10.5);
// Pattern: Complete statistical summary in one reduction
class CompleteStats {
double sum = 0;
double sumSquares = 0;
double min = Double.MAX_VALUE;
double max = Double.MIN_VALUE;
long count = 0;
CompleteStats accumulate(double value) {
this.sum += value;
this.sumSquares += value * value;
this.min = Math.min(this.min, value);
this.max = Math.max(this.max, value);
this.count++;
return this;
}
CompleteStats combine(CompleteStats other) {
CompleteStats result = new CompleteStats();
result.sum = this.sum + other.sum;
result.sumSquares = this.sumSquares + other.sumSquares;
result.min = Math.min(this.min, other.min);
result.max = Math.max(this.max, other.max);
result.count = this.count + other.count;
return result;
}
double getMean() { return count > 0 ? sum / count : 0; }
double getVariance() { 
return count > 0 ? (sumSquares / count) - Math.pow(getMean(), 2) : 0; 
}
double getStdDev() { return Math.sqrt(getVariance()); }
@Override
public String toString() {
return String.format(
"Stats{n=%d, mean=%.2f, stdDev=%.2f, min=%.2f, max=%.2f}",
count, getMean(), getStdDev(), min, max);
}
}
CompleteStats stats = data.parallelStream()
.reduce(new CompleteStats(),
CompleteStats::accumulate,
CompleteStats::combine);
System.out.println("Complete statistics: " + stats);
}
public static void textProcessingPattern() {
System.out.println("\n=== Text Processing Pattern ===");
List<String> documents = Arrays.asList(
"The quick brown fox jumps over the lazy dog",
"Java streams provide functional operations",
"Parallel reduction enables efficient processing",
"The fox and the dog are both animals"
);
// Pattern: Word frequency analysis
Map<String, Integer> wordFrequency = documents.parallelStream()
.flatMap(doc -> Arrays.stream(doc.toLowerCase().split("\\W+")))
.filter(word -> !word.isEmpty())
.reduce(new HashMap<>(),
(map, word) -> {
map.merge(word, 1, Integer::sum);
return map;
},
(map1, map2) -> {
map2.forEach((word, count) -> 
map1.merge(word, count, Integer::sum));
return map1;
});
// Show top 5 most frequent words
System.out.println("Top 5 frequent words:");
wordFrequency.entrySet().stream()
.sorted(Map.Entry.<String, Integer>comparingByValue().reversed())
.limit(5)
.forEach(entry -> 
System.out.println("  " + entry.getKey() + ": " + entry.getValue()));
}
}

Summary

Key Points for Parallel Stream Reduction:

  1. Associativity is Critical: The reduction operation must be associative for correct parallel results
  2. Identity Element: Must satisfy identity op a = a for all elements
  3. Combiner Function: Required for parallel streams to combine partial results
  4. Performance: Parallel reduction shines with large datasets and expensive operations
  5. Primitive Streams: Use for better performance with primitive data

When to Use Parallel Reduction:

  • Large datasets (> 1000 elements)
  • CPU-intensive operations
  • Associative operations available
  • Sufficient available processors

Common Pitfalls:

  • Non-associative operations (subtraction, division)
  • Wrong identity elements
  • Stateful operations in reducers
  • Overhead for small datasets

Parallel stream reduction is a powerful tool for leveraging multi-core processors, but requires careful consideration of operation properties and performance characteristics.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper