Parallel Streams in Java

Introduction

Imagine you have a big pile of papers to sort. You could do it yourself (one by one), or you could get several friends to help you, each working on a different part of the pile simultaneously. Parallel Streams in Java are like having those friends—they automatically split your work across multiple CPU cores to process data much faster!

Parallel streams allow you to process collections concurrently, leveraging the power of multi-core processors without dealing with complex threading code.


What are Parallel Streams?

Parallel streams are streams that divide their elements into multiple chunks and process each chunk independently in different threads, then combine the results. They're designed to make parallel processing simple and accessible.

Key Concepts:

  • Automatic Parallelism: Java handles thread creation and management
  • Fork-Join Framework: Uses Java's ForkJoinPool behind the scenes
  • Data Splitting: Automatically divides data into smaller chunks
  • Result Combining: Merges results from different threads
  • Thread Safety: Built-in mechanisms to avoid common threading issues

Code Explanation with Examples

Example 1: Creating Parallel Streams

import java.util.*;
import java.util.stream.*;
public class ParallelStreamCreation {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// Method 1: Using parallelStream() on Collection
List<Integer> result1 = numbers.parallelStream()
.map(n -> n * 2)
.collect(Collectors.toList());
System.out.println("Using parallelStream(): " + result1);
// Method 2: Converting sequential stream to parallel
List<Integer> result2 = numbers.stream()
.parallel()  // Convert to parallel
.map(n -> n * 2)
.collect(Collectors.toList());
System.out.println("Using stream().parallel(): " + result2);
// Method 3: Using parallel() on Arrays
int[] array = {1, 2, 3, 4, 5};
Arrays.stream(array)
.parallel()
.map(n -> n * 2)
.forEach(n -> System.out.print(n + " "));
}
}

Example 2: Performance Comparison - Sequential vs Parallel

import java.util.*;
import java.util.stream.*;
import java.time.*;
public class PerformanceComparison {
// A method that simulates some work
public static int process(int number) {
try {
// Simulate processing time
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return number * 2;
}
public static void main(String[] args) {
List<Integer> numbers = IntStream.rangeClosed(1, 100)
.boxed()
.collect(Collectors.toList());
// Sequential processing
Instant sequentialStart = Instant.now();
List<Integer> sequentialResult = numbers.stream()
.map(PerformanceComparison::process)
.collect(Collectors.toList());
Duration sequentialTime = Duration.between(sequentialStart, Instant.now());
// Parallel processing
Instant parallelStart = Instant.now();
List<Integer> parallelResult = numbers.parallelStream()
.map(PerformanceComparison::process)
.collect(Collectors.toList());
Duration parallelTime = Duration.between(parallelStart, Instant.now());
System.out.println("Sequential time: " + sequentialTime.toMillis() + " ms");
System.out.println("Parallel time: " + parallelTime.toMillis() + " ms");
System.out.println("Speedup: " + (sequentialTime.toMillis() / (double)parallelTime.toMillis()) + "x");
}
}

Typical Output:

Sequential time: 1015 ms
Parallel time: 125 ms
Speedup: 8.12x

Example 3: When to Use Parallel Streams

import java.util.*;
import java.util.stream.*;
public class WhenToUseParallel {
public static void main(String[] args) {
List<Integer> largeList = IntStream.rangeClosed(1, 1_000_000)
.boxed()
.collect(Collectors.toList());
// GOOD: CPU-intensive operations on large datasets
long start1 = System.currentTimeMillis();
long sumParallel = largeList.parallelStream()
.filter(n -> n % 2 == 0)
.mapToLong(n -> (long) n * n)  // CPU-intensive calculation
.sum();
long end1 = System.currentTimeMillis();
System.out.println("Parallel sum of squares: " + sumParallel);
System.out.println("Parallel time: " + (end1 - start1) + " ms");
// Compare with sequential
long start2 = System.currentTimeMillis();
long sumSequential = largeList.stream()
.filter(n -> n % 2 == 0)
.mapToLong(n -> (long) n * n)
.sum();
long end2 = System.currentTimeMillis();
System.out.println("Sequential time: " + (end2 - start2) + " ms");
}
}

Example 4: Ordering in Parallel Streams

import java.util.*;
import java.util.stream.*;
public class OrderingExample {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
System.out.println("Original order: " + numbers);
// Parallel processing - order may not be preserved
List<Integer> unordered = numbers.parallelStream()
.map(n -> n * 2)
.collect(Collectors.toList());
System.out.println("Parallel (may be unordered): " + unordered);
// Force ordering with forEachOrdered
System.out.print("forEachOrdered: ");
numbers.parallelStream()
.map(n -> n * 2)
.forEachOrdered(n -> System.out.print(n + " "));
System.out.println();
// Preserve order with collect
List<Integer> ordered = numbers.parallelStream()
.map(n -> n * 2)
.collect(Collectors.toList());
System.out.println("Collect preserves order: " + ordered);
}
}

Example 5: Real-World Use Cases

import java.util.*;
import java.util.stream.*;
import java.nio.file.*;
import java.io.*;
public class RealWorldExamples {
public static void main(String[] args) throws IOException {
// Example 1: Processing large files in parallel
List<String> lines = Files.readAllLines(Paths.get("large_file.txt"));
// Count words in parallel
long wordCount = lines.parallelStream()
.flatMap(line -> Arrays.stream(line.split("\\s+")))
.filter(word -> !word.isEmpty())
.count();
System.out.println("Total words: " + wordCount);
// Example 2: Image processing simulation
List<String> imageFiles = Arrays.asList("img1.jpg", "img2.jpg", "img3.jpg", 
"img4.jpg", "img5.jpg", "img6.jpg");
List<String> processedImages = imageFiles.parallelStream()
.map(image -> {
// Simulate image processing
System.out.println("Processing " + image + " on thread: " + 
Thread.currentThread().getName());
try { Thread.sleep(1000); } catch (InterruptedException e) {}
return "processed_" + image;
})
.collect(Collectors.toList());
System.out.println("Processed images: " + processedImages);
// Example 3: Data analysis
List<Double> sensorReadings = DoubleStream.generate(Math::random)
.limit(1000000)
.boxed()
.collect(Collectors.toList());
double average = sensorReadings.parallelStream()
.mapToDouble(Double::doubleValue)
.average()
.orElse(0.0);
System.out.println("Average reading: " + average);
}
}

Example 6: Potential Pitfalls and Solutions

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
public class ParallelPitfalls {
private static int sharedCounter = 0;
public static void main(String[] args) {
List<Integer> numbers = IntStream.rangeClosed(1, 10000)
.boxed()
.collect(Collectors.toList());
// ❌ DANGEROUS: Shared mutable state
numbers.parallelStream().forEach(n -> {
sharedCounter++;  // RACE CONDITION!
});
System.out.println("Unsafe counter: " + sharedCounter);
// ✅ SAFE: Using reduction
int safeSum = numbers.parallelStream()
.mapToInt(Integer::intValue)
.sum();
System.out.println("Safe sum: " + safeSum);
// ✅ SAFE: Using thread-safe collections
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
numbers.parallelStream()
.map(n -> n * 2)
.forEach(synchronizedList::add);
System.out.println("Synchronized list size: " + synchronizedList.size());
// ✅ BEST: Using collect with thread-safe collector
List<Integer> safeList = numbers.parallelStream()
.map(n -> n * 2)
.collect(Collectors.toList());
System.out.println("Safe collected list size: " + safeList.size());
}
}

Example 7: Custom Thread Pool for Parallel Streams

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
public class CustomThreadPool {
public static void main(String[] args) {
List<Integer> numbers = IntStream.rangeClosed(1, 100)
.boxed()
.collect(Collectors.toList());
// Create custom thread pool
ForkJoinPool customThreadPool = new ForkJoinPool(4); // 4 threads
try {
List<Integer> result = customThreadPool.submit(() -> 
numbers.parallelStream()
.map(n -> {
System.out.println("Processing " + n + " on " + 
Thread.currentThread().getName());
return n * 2;
})
.collect(Collectors.toList())
).get();
System.out.println("Result: " + result);
} catch (Exception e) {
e.printStackTrace();
} finally {
customThreadPool.shutdown();
}
}
}

When to Use Parallel Streams

✅ Good Cases for Parallel Streams:

  • Large datasets (thousands+ elements)
  • CPU-intensive operations
  • Independent operations (no shared state)
  • I/O operations that can be parallelized
  • When order doesn't matter

❌ Avoid Parallel Streams When:

  • Small datasets (overhead > benefit)
  • Operations are already fast
  • Order must be strictly preserved
  • Operations have shared mutable state
  • I/O operations that can't be parallelized

Best Practices

  1. Test Performance: Always measure sequential vs parallel performance
  2. Avoid Shared State: Use stateless operations and proper reduction
  3. Consider Ordering: Use forEachOrdered() when order matters
  4. Watch Resource Usage: Parallel streams use common ForkJoinPool
  5. Use Right Data Structures: Some collections parallelize better than others
  6. Consider Nested Parallelism: Avoid parallel streams within parallel streams

Performance Tips

// ✅ Good: Large dataset with heavy computation
largeList.parallelStream()
.filter(item -> heavyComputation(item))
.collect(Collectors.toList());
// ❌ Bad: Small dataset or simple operations
smallList.parallelStream()  // Overhead > benefit
.map(x -> x + 1)
.collect(Collectors.toList());
// ✅ Good: Order doesn't matter
list.parallelStream()
.unordered()  // Can improve performance
.filter(...)
.collect(Collectors.toList());

Conclusion

Parallel streams are like having a team of workers instead of working alone:

  • Automatic parallelization without complex threading code
  • Significant speedup for CPU-intensive tasks on multi-core systems
  • Built-in safety features to avoid common threading pitfalls
  • Simple API - just add .parallel() or use .parallelStream()

Remember: Parallel streams aren't always faster. They work best with large datasets and computationally expensive operations. Always test and measure performance for your specific use case!

They bring the power of parallel processing to everyday Java programming, making your applications faster and more efficient when used correctly.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper