Thread Pool with ExecutorService in Java

ExecutorService provides a higher-level replacement for working with threads directly. It manages a pool of threads and provides methods for submitting tasks for execution.

1. Basic ExecutorService Usage

Creating Different Types of ExecutorServices

import java.util.concurrent.*;
import java.util.*;
public class BasicExecutorService {
static class Task implements Runnable {
private String taskName;
private int duration;
public Task(String name, int duration) {
this.taskName = name;
this.duration = duration;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + 
" executing " + taskName + " for " + duration + " seconds");
try {
// Simulate work
for (int i = 1; i <= duration; i++) {
System.out.println(Thread.currentThread().getName() + 
" - " + taskName + " progress: " + i + "/" + duration);
Thread.sleep(1000);
}
System.out.println(Thread.currentThread().getName() + 
" completed " + taskName);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + 
" interrupted while executing " + taskName);
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) throws InterruptedException {
System.out.println("=== Basic ExecutorService Usage ===");
// 1. Fixed Thread Pool
demonstrateFixedThreadPool();
// 2. Cached Thread Pool
demonstrateCachedThreadPool();
// 3. Single Thread Executor
demonstrateSingleThreadExecutor();
// 4. Scheduled Thread Pool
demonstrateScheduledThreadPool();
// 5. Custom Thread Pool
demonstrateCustomThreadPool();
}
public static void demonstrateFixedThreadPool() throws InterruptedException {
System.out.println("\n=== FixedThreadPool (3 threads) ===");
ExecutorService executor = Executors.newFixedThreadPool(3);
// Submit 6 tasks to 3-thread pool
for (int i = 1; i <= 6; i++) {
Task task = new Task("FixedPool-Task-" + i, 2);
executor.execute(task);
}
// Shutdown gracefully
executor.shutdown();
boolean terminated = executor.awaitTermination(10, TimeUnit.SECONDS);
System.out.println("FixedThreadPool terminated: " + terminated);
}
public static void demonstrateCachedThreadPool() throws InterruptedException {
System.out.println("\n=== CachedThreadPool ===");
ExecutorService executor = Executors.newCachedThreadPool();
// Submit tasks with varying durations
for (int i = 1; i <= 8; i++) {
int duration = 1 + (i % 3); // 1, 2, or 3 seconds
Task task = new Task("CachedPool-Task-" + i, duration);
executor.execute(task);
}
// Monitor thread creation
Thread.sleep(2000);
System.out.println("Submitted more tasks after delay...");
for (int i = 9; i <= 12; i++) {
Task task = new Task("CachedPool-Task-" + i, 1);
executor.execute(task);
}
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
}
public static void demonstrateSingleThreadExecutor() throws InterruptedException {
System.out.println("\n=== SingleThreadExecutor ===");
ExecutorService executor = Executors.newSingleThreadExecutor();
// Tasks will execute sequentially
for (int i = 1; i <= 5; i++) {
Task task = new Task("SingleThread-Task-" + i, 1);
executor.execute(task);
}
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
}
public static void demonstrateScheduledThreadPool() throws InterruptedException {
System.out.println("\n=== ScheduledThreadPool ===");
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
// Schedule one-time task
System.out.println("Scheduling one-time task in 2 seconds...");
ScheduledFuture<?> oneTimeFuture = scheduler.schedule(
() -> System.out.println("One-time task executed!"),
2, TimeUnit.SECONDS
);
// Schedule fixed-rate task
System.out.println("Scheduling fixed-rate task (every 3 seconds)...");
ScheduledFuture<?> fixedRateFuture = scheduler.scheduleAtFixedRate(
() -> System.out.println("Fixed-rate task executed at " + new Date()),
1, 3, TimeUnit.SECONDS
);
// Schedule fixed-delay task
System.out.println("Scheduling fixed-delay task (3 seconds between executions)...");
ScheduledFuture<?> fixedDelayFuture = scheduler.scheduleWithFixedDelay(
() -> {
System.out.println("Fixed-delay task started at " + new Date());
try {
Thread.sleep(1000); // Simulate work
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Fixed-delay task completed");
},
1, 3, TimeUnit.SECONDS
);
// Let it run for 10 seconds
Thread.sleep(10000);
// Cancel periodic tasks
fixedRateFuture.cancel(false);
fixedDelayFuture.cancel(false);
scheduler.shutdown();
scheduler.awaitTermination(5, TimeUnit.SECONDS);
}
public static void demonstrateCustomThreadPool() throws InterruptedException {
System.out.println("\n=== Custom ThreadPool ===");
// Create custom thread pool with specific parameters
ThreadPoolExecutor customExecutor = new ThreadPoolExecutor(
2, // core pool size
4, // maximum pool size
60, // keep-alive time
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2), // queue capacity
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy() // rejection policy
);
System.out.println("Custom ThreadPool created:");
System.out.println("Core threads: " + customExecutor.getCorePoolSize());
System.out.println("Max threads: " + customExecutor.getMaximumPoolSize());
System.out.println("Queue capacity: " + customExecutor.getQueue().remainingCapacity());
// Submit tasks
for (int i = 1; i <= 8; i++) {
final int taskId = i;
customExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName() + 
" executing task " + taskId);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Small delay to see pool behavior
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
// Monitor pool stats
System.out.printf("Task %d submitted - Pool: %d, Active: %d, Queue: %d%n",
taskId,
customExecutor.getPoolSize(),
customExecutor.getActiveCount(),
customExecutor.getQueue().size());
}
customExecutor.shutdown();
customExecutor.awaitTermination(20, TimeUnit.SECONDS);
}
}

2. Submitting Tasks and Handling Results

Using submit() with Futures

import java.util.concurrent.*;
import java.util.*;
public class TaskSubmissionWithFutures {
static class ComputationTask implements Callable<Long> {
private String taskName;
private int number;
public ComputationTask(String name, int number) {
this.taskName = name;
this.number = number;
}
@Override
public Long call() throws Exception {
System.out.println(Thread.currentThread().getName() + 
" starting computation for " + taskName);
// Simulate computation time
Thread.sleep(2000);
// Compute factorial
long result = 1;
for (int i = 2; i <= number; i++) {
result *= i;
}
System.out.println(Thread.currentThread().getName() + 
" completed " + taskName + " = " + result);
return result;
}
}
static class FileProcessor implements Callable<String> {
private String fileName;
public FileProcessor(String fileName) {
this.fileName = fileName;
}
@Override
public String call() throws Exception {
System.out.println(Thread.currentThread().getName() + 
" processing file: " + fileName);
// Simulate file processing
int processingTime = 1000 + (int)(Math.random() * 3000);
Thread.sleep(processingTime);
// Simulate occasional failure
if (Math.random() < 0.2) {
throw new RuntimeException("Failed to process file: " + fileName);
}
return "Processed: " + fileName + " in " + processingTime + "ms";
}
}
public static void main(String[] args) throws Exception {
System.out.println("=== Task Submission with Futures ===");
// 1. Basic Future usage
demonstrateBasicFutures();
// 2. Handling multiple Futures
demonstrateMultipleFutures();
// 3. Exception handling with Futures
demonstrateFutureExceptionHandling();
// 4. Using CompletableFuture
demonstrateCompletableFuture();
}
public static void demonstrateBasicFutures() throws Exception {
System.out.println("\n=== Basic Future Usage ===");
ExecutorService executor = Executors.newFixedThreadPool(2);
// Submit Callable tasks and get Futures
Future<Long> future1 = executor.submit(new ComputationTask("Factorial-5", 5));
Future<Long> future2 = executor.submit(new ComputationTask("Factorial-7", 7));
Future<Long> future3 = executor.submit(new ComputationTask("Factorial-10", 10));
System.out.println("Tasks submitted, checking status...");
// Check if tasks are done
System.out.println("Task1 done: " + future1.isDone());
System.out.println("Task2 done: " + future2.isDone());
System.out.println("Task3 done: " + future3.isDone());
// Get results (this will block until results are available)
System.out.println("Getting results...");
Long result1 = future1.get();
Long result2 = future2.get();
Long result3 = future3.get();
System.out.println("Results: " + result1 + ", " + result2 + ", " + result3);
System.out.println("All tasks completed: " + 
(future1.isDone() && future2.isDone() && future3.isDone()));
executor.shutdown();
}
public static void demonstrateMultipleFutures() throws Exception {
System.out.println("\n=== Handling Multiple Futures ===");
ExecutorService executor = Executors.newFixedThreadPool(3);
// Submit multiple file processing tasks
List<Future<String>> futures = new ArrayList<>();
String[] files = {"file1.txt", "file2.log", "file3.dat", "file4.csv", "file5.xml"};
for (String file : files) {
Future<String> future = executor.submit(new FileProcessor(file));
futures.add(future);
}
System.out.println("All file processing tasks submitted");
// Process results as they complete
List<String> results = new ArrayList<>();
for (Future<String> future : futures) {
try {
// Get with timeout
String result = future.get(5, TimeUnit.SECONDS);
results.add(result);
System.out.println("Got result: " + result);
} catch (TimeoutException e) {
System.out.println("Task timed out, cancelling...");
future.cancel(true);
} catch (ExecutionException e) {
System.out.println("Task failed: " + e.getCause().getMessage());
}
}
System.out.println("Successfully processed " + results.size() + " files");
System.out.println("Results: " + results);
executor.shutdown();
}
public static void demonstrateFutureExceptionHandling() throws InterruptedException {
System.out.println("\n=== Future Exception Handling ===");
ExecutorService executor = Executors.newFixedThreadPool(2);
// Submit tasks that might fail
Future<String> successFuture = executor.submit(() -> {
Thread.sleep(1000);
return "Success result";
});
Future<String> failureFuture = executor.submit(() -> {
Thread.sleep(1000);
throw new IllegalArgumentException("Simulated task failure");
});
// Handle successful future
try {
String result = successFuture.get();
System.out.println("Success task result: " + result);
} catch (ExecutionException e) {
System.out.println("Success task failed: " + e.getCause().getMessage());
} catch (InterruptedException e) {
System.out.println("Interrupted while waiting for success task");
}
// Handle failed future
try {
String result = failureFuture.get();
System.out.println("Failure task result: " + result);
} catch (ExecutionException e) {
System.out.println("Failure task failed as expected: " + e.getCause().getMessage());
} catch (InterruptedException e) {
System.out.println("Interrupted while waiting for failure task");
}
// Check cancellation
Future<String> cancellableFuture = executor.submit(() -> {
Thread.sleep(5000);
return "This should be cancelled";
});
// Cancel after 1 second
Thread.sleep(1000);
boolean cancelled = cancellableFuture.cancel(true);
System.out.println("Task cancelled: " + cancelled);
System.out.println("Task is cancelled: " + cancellableFuture.isCancelled());
executor.shutdown();
}
public static void demonstrateCompletableFuture() throws Exception {
System.out.println("\n=== CompletableFuture ===");
// Using CompletableFuture for asynchronous programming
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " executing async task 1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Result from task 1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " executing async task 2");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Result from task 2";
});
// Chain futures
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {
return result1 + " + " + result2;
});
// Handle completion
combinedFuture.thenAccept(result -> {
System.out.println("Combined result: " + result);
});
// Wait for completion
String finalResult = combinedFuture.get(5, TimeUnit.SECONDS);
System.out.println("Final result: " + finalResult);
// Exception handling with CompletableFuture
CompletableFuture<String> exceptionalFuture = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Random failure");
}
return "Success";
});
CompletableFuture<String> handledFuture = exceptionalFuture.exceptionally(throwable -> {
return "Recovered from: " + throwable.getMessage();
});
System.out.println("Exception handling result: " + handledFuture.get());
}
}

3. Thread Pool Management and Monitoring

Advanced Thread Pool Management

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.*;
public class ThreadPoolManagement {
static class MonitoringTask implements Runnable {
private String taskName;
private int duration;
public MonitoringTask(String name, int duration) {
this.taskName = name;
this.duration = duration;
}
@Override
public void run() {
long startTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + 
" started " + taskName + " at " + new Date(startTime));
try {
// Simulate work
for (int i = 1; i <= duration; i++) {
System.out.println(Thread.currentThread().getName() + 
" - " + taskName + " working... (" + i + "/" + duration + ")");
Thread.sleep(1000);
// Check for interruption
if (Thread.currentThread().isInterrupted()) {
System.out.println(Thread.currentThread().getName() + 
" interrupted during " + taskName);
return;
}
}
long endTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + 
" completed " + taskName + " in " + 
(endTime - startTime) + "ms");
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + 
" interrupted during " + taskName);
Thread.currentThread().interrupt();
}
}
}
static class ThreadPoolMonitor implements Runnable {
private ThreadPoolExecutor executor;
private AtomicBoolean shutdown;
public ThreadPoolMonitor(ThreadPoolExecutor executor, AtomicBoolean shutdown) {
this.executor = executor;
this.shutdown = shutdown;
}
@Override
public void run() {
while (!shutdown.get()) {
System.out.println("\n=== Thread Pool Statistics ===");
System.out.println("Pool Size: " + executor.getPoolSize());
System.out.println("Active Count: " + executor.getActiveCount());
System.out.println("Core Pool Size: " + executor.getCorePoolSize());
System.out.println("Maximum Pool Size: " + executor.getMaximumPoolSize());
System.out.println("Largest Pool Size: " + executor.getLargestPoolSize());
System.out.println("Task Count: " + executor.getTaskCount());
System.out.println("Completed Task Count: " + executor.getCompletedTaskCount());
System.out.println("Queue Size: " + executor.getQueue().size());
System.out.println("Queue Remaining Capacity: " + executor.getQueue().remainingCapacity());
System.out.println("============================\n");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
System.out.println("Monitor interrupted");
break;
}
}
System.out.println("ThreadPoolMonitor stopped");
}
}
public static void main(String[] args) throws Exception {
System.out.println("=== Thread Pool Management and Monitoring ===");
// 1. Thread Pool with Monitoring
demonstrateThreadPoolMonitoring();
// 2. Graceful Shutdown
demonstrateGracefulShutdown();
// 3. Custom Rejection Policies
demonstrateRejectionPolicies();
// 4. Dynamic Thread Pool Adjustment
demonstrateDynamicAdjustment();
}
public static void demonstrateThreadPoolMonitoring() throws InterruptedException {
System.out.println("\n=== Thread Pool Monitoring ===");
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 30, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(5),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
AtomicBoolean shutdown = new AtomicBoolean(false);
// Start monitoring
Thread monitorThread = new Thread(new ThreadPoolMonitor(executor, shutdown));
monitorThread.setDaemon(true);
monitorThread.start();
// Submit tasks
for (int i = 1; i <= 10; i++) {
try {
executor.execute(new MonitoringTask("Task-" + i, 3));
System.out.println("Submitted Task-" + i);
Thread.sleep(500); // Slow submission to see pool behavior
} catch (RejectedExecutionException e) {
System.out.println("Task-" + i + " rejected: " + e.getMessage());
}
}
// Let tasks complete
Thread.sleep(5000);
// Stop monitoring and shutdown
shutdown.set(true);
executor.shutdown();
boolean terminated = executor.awaitTermination(10, TimeUnit.SECONDS);
System.out.println("Executor terminated: " + terminated);
}
public static void demonstrateGracefulShutdown() throws InterruptedException {
System.out.println("\n=== Graceful Shutdown ===");
ExecutorService executor = Executors.newFixedThreadPool(3);
// Submit long-running tasks
for (int i = 1; i <= 6; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println(Thread.currentThread().getName() + 
" starting long task " + taskId);
try {
Thread.sleep(5000); // Long task
System.out.println(Thread.currentThread().getName() + 
" completed task " + taskId);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + 
" interrupted in task " + taskId);
}
});
}
// Wait a bit then shutdown
Thread.sleep(1000);
System.out.println("Initiating shutdown...");
// Step 1: Prevent new tasks
executor.shutdown();
// Step 2: Wait for existing tasks to complete
System.out.println("Waiting for tasks to complete...");
boolean completed = executor.awaitTermination(10, TimeUnit.SECONDS);
if (!completed) {
System.out.println("Timeout reached, forcing shutdown...");
// Step 3: Force shutdown if timeout reached
List<Runnable> unfinishedTasks = executor.shutdownNow();
System.out.println("Cancelled " + unfinishedTasks.size() + " tasks");
// Wait a bit more for threads to respond to interrupt
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
System.err.println("Executor did not terminate");
}
} else {
System.out.println("All tasks completed gracefully");
}
}
public static void demonstrateRejectionPolicies() throws InterruptedException {
System.out.println("\n=== Rejection Policies ===");
// Test different rejection policies
testRejectionPolicy("AbortPolicy", new ThreadPoolExecutor.AbortPolicy());
testRejectionPolicy("CallerRunsPolicy", new ThreadPoolExecutor.CallerRunsPolicy());
testRejectionPolicy("DiscardPolicy", new ThreadPoolExecutor.DiscardPolicy());
testRejectionPolicy("DiscardOldestPolicy", new ThreadPoolExecutor.DiscardOldestPolicy());
}
public static void testRejectionPolicy(String policyName, 
RejectedExecutionHandler policy) 
throws InterruptedException {
System.out.println("\nTesting " + policyName + ":");
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 1, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
policy
);
// Submit more tasks than pool can handle
for (int i = 1; i <= 5; i++) {
final int taskId = i;
try {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName() + 
" executing task " + taskId);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
System.out.println("Task " + taskId + " submitted successfully");
} catch (RejectedExecutionException e) {
System.out.println("Task " + taskId + " rejected: " + e.getMessage());
}
Thread.sleep(100); // Small delay between submissions
}
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
}
public static void demonstrateDynamicAdjustment() throws InterruptedException {
System.out.println("\n=== Dynamic Thread Pool Adjustment ===");
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 5, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10)
);
System.out.println("Initial pool - Core: " + executor.getCorePoolSize() + 
", Max: " + executor.getMaximumPoolSize());
// Submit initial tasks
for (int i = 1; i <= 8; i++) {
executor.execute(createShortTask("Initial-Task-" + i));
}
Thread.sleep(2000);
System.out.println("After initial load - Pool: " + executor.getPoolSize() + 
", Active: " + executor.getActiveCount());
// Increase core pool size dynamically
System.out.println("Increasing core pool size to 4...");
executor.setCorePoolSize(4);
// Submit more tasks
for (int i = 9; i <= 12; i++) {
executor.execute(createShortTask("Additional-Task-" + i));
}
Thread.sleep(2000);
System.out.println("After adjustment - Pool: " + executor.getPoolSize() + 
", Active: " + executor.getActiveCount());
// Decrease core pool size
System.out.println("Decreasing core pool size to 2...");
executor.setCorePoolSize(2);
Thread.sleep(5000); // Wait for keep-alive time
System.out.println("After decrease - Pool: " + executor.getPoolSize() + 
", Active: " + executor.getActiveCount());
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
}
private static Runnable createShortTask(String name) {
return () -> {
System.out.println(Thread.currentThread().getName() + " executing " + name);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};
}
}

4. Real-World Examples

Web Server Simulation

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.net.*;
import java.io.*;
public class RealWorldExecutorExamples {
// Simulated web request handler
static class RequestHandler implements Runnable {
private final int requestId;
private final String clientIp;
private static final AtomicInteger requestCounter = new AtomicInteger(0);
public RequestHandler(String clientIp) {
this.requestId = requestCounter.incrementAndGet();
this.clientIp = clientIp;
}
@Override
public void run() {
String threadName = Thread.currentThread().getName();
long startTime = System.currentTimeMillis();
System.out.println("[" + new Date(startTime) + "] " + 
threadName + " handling request #" + requestId + " from " + clientIp);
try {
// Simulate different types of requests
String requestType = simulateRequestType();
int processingTime = simulateProcessingTime(requestType);
System.out.println(threadName + " - Request #" + requestId + 
" type: " + requestType + ", estimated time: " + 
processingTime + "ms");
// Process request
Thread.sleep(processingTime);
long endTime = System.currentTimeMillis();
System.out.println("[" + new Date(endTime) + "] " + 
threadName + " completed request #" + requestId + 
" in " + (endTime - startTime) + "ms");
} catch (InterruptedException e) {
System.out.println(threadName + " interrupted while handling request #" + requestId);
Thread.currentThread().interrupt();
}
}
private String simulateRequestType() {
String[] types = {"GET", "POST", "PUT", "DELETE", "API_CALL"};
return types[(int)(Math.random() * types.length)];
}
private int simulateProcessingTime(String requestType) {
switch (requestType) {
case "GET": return 100 + (int)(Math.random() * 400);
case "POST": return 200 + (int)(Math.random() * 600);
case "PUT": return 300 + (int)(Math.random() * 700);
case "DELETE": return 150 + (int)(Math.random() * 500);
case "API_CALL": return 500 + (int)(Math.random() * 1000);
default: return 200;
}
}
}
// Web server simulation
static class WebServer {
private final ThreadPoolExecutor requestExecutor;
private final ScheduledExecutorService monitorExecutor;
private final AtomicInteger activeRequests;
private volatile boolean running;
public WebServer(int minThreads, int maxThreads, int queueCapacity) {
this.activeRequests = new AtomicInteger(0);
this.running = true;
// Create request processing thread pool
this.requestExecutor = new ThreadPoolExecutor(
minThreads, maxThreads, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueCapacity),
new ThreadFactory() {
private AtomicInteger threadCounter = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "RequestHandler-" + threadCounter.getAndIncrement());
thread.setDaemon(true);
return thread;
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
// Create monitoring service
this.monitorExecutor = Executors.newScheduledThreadPool(1);
startMonitoring();
}
public void handleRequest(String clientIp) {
if (!running) {
System.out.println("Server not accepting new requests");
return;
}
activeRequests.incrementAndGet();
RequestHandler handler = new RequestHandler(clientIp);
try {
requestExecutor.execute(() -> {
try {
handler.run();
} finally {
activeRequests.decrementAndGet();
}
});
} catch (RejectedExecutionException e) {
activeRequests.decrementAndGet();
System.out.println("Request rejected from " + clientIp + " - server overloaded");
}
}
private void startMonitoring() {
monitorExecutor.scheduleAtFixedRate(() -> {
System.out.println("\n=== Server Status ===");
System.out.println("Active Requests: " + activeRequests.get());
System.out.println("Thread Pool - Pool: " + requestExecutor.getPoolSize() + 
", Active: " + requestExecutor.getActiveCount() +
", Queue: " + requestExecutor.getQueue().size());
System.out.println("Completed Tasks: " + requestExecutor.getCompletedTaskCount());
System.out.println("========================\n");
}, 1, 5, TimeUnit.SECONDS);
}
public void shutdown() {
System.out.println("Shutting down web server...");
running = false;
// Stop accepting new requests
monitorExecutor.shutdown();
// Graceful shutdown of request executor
requestExecutor.shutdown();
try {
if (!requestExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
System.out.println("Forcing shutdown...");
requestExecutor.shutdownNow();
}
} catch (InterruptedException e) {
requestExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("Web server shutdown complete");
}
}
public static void main(String[] args) throws InterruptedException {
System.out.println("=== Real-World ExecutorService Examples ===");
// 1. Web Server Simulation
demonstrateWebServer();
// 2. Data Processing Pipeline
demonstrateDataProcessingPipeline();
// 3. Batch Job Processing
demonstrateBatchJobProcessing();
}
public static void demonstrateWebServer() throws InterruptedException {
System.out.println("\n=== Web Server Simulation ===");
WebServer server = new WebServer(2, 5, 10);
// Simulate incoming requests
Thread requestSimulator = new Thread(() -> {
for (int i = 1; i <= 20; i++) {
String clientIp = "192.168.1." + (100 + i);
server.handleRequest(clientIp);
// Random delay between requests
try {
Thread.sleep(100 + (int)(Math.random() * 400));
} catch (InterruptedException e) {
break;
}
}
});
requestSimulator.start();
requestSimulator.join();
// Wait for requests to process
Thread.sleep(5000);
server.shutdown();
}
public static void demonstrateDataProcessingPipeline() throws InterruptedException {
System.out.println("\n=== Data Processing Pipeline ===");
ExecutorService extractionExecutor = Executors.newFixedThreadPool(2);
ExecutorService transformationExecutor = Executors.newFixedThreadPool(3);
ExecutorService loadingExecutor = Executors.newFixedThreadPool(2);
CompletionService<String> completionService = 
new ExecutorCompletionService<>(loadingExecutor);
// Extract phase
List<Future<String>> extractionFutures = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
final int dataSourceId = i;
Future<String> extractionFuture = extractionExecutor.submit(() -> {
System.out.println("Extracting from source " + dataSourceId);
Thread.sleep(1000 + (int)(Math.random() * 2000));
return "Data from source " + dataSourceId;
});
extractionFutures.add(extractionFuture);
}
// Transform phase (depends on extraction)
List<Future<String>> transformationFutures = new ArrayList<>();
for (Future<String> extractionFuture : extractionFutures) {
Future<String> transformationFuture = transformationExecutor.submit(() -> {
String rawData = extractionFuture.get();
System.out.println("Transforming: " + rawData);
Thread.sleep(1500);
return rawData + " -> TRANSFORMED";
});
transformationFutures.add(transformationFuture);
}
// Load phase (depends on transformation)
for (Future<String> transformationFuture : transformationFutures) {
completionService.submit(() -> {
String transformedData = transformationFuture.get();
System.out.println("Loading: " + transformedData);
Thread.sleep(1000);
return "LOADED: " + transformedData;
});
}
// Collect results
System.out.println("Collecting pipeline results...");
for (int i = 0; i < transformationFutures.size(); i++) {
try {
Future<String> loadedFuture = completionService.take();
String result = loadedFuture.get();
System.out.println("Pipeline result: " + result);
} catch (ExecutionException e) {
System.out.println("Pipeline stage failed: " + e.getCause().getMessage());
}
}
// Shutdown executors
extractionExecutor.shutdown();
transformationExecutor.shutdown();
loadingExecutor.shutdown();
extractionExecutor.awaitTermination(10, TimeUnit.SECONDS);
transformationExecutor.awaitTermination(10, TimeUnit.SECONDS);
loadingExecutor.awaitTermination(10, TimeUnit.SECONDS);
System.out.println("Data processing pipeline completed");
}
public static void demonstrateBatchJobProcessing() throws InterruptedException {
System.out.println("\n=== Batch Job Processing ===");
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
ExecutorService workerExecutor = Executors.newFixedThreadPool(4);
AtomicInteger jobCounter = new AtomicInteger(0);
CountDownLatch batchLatch = new CountDownLatch(3); // Process 3 batches
// Schedule batch jobs
ScheduledFuture<?> batchJob = scheduler.scheduleAtFixedRate(() -> {
int batchId = jobCounter.incrementAndGet();
System.out.println("Starting batch job #" + batchId + " at " + new Date());
// Submit tasks for this batch
List<Future<String>> batchFutures = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
final int taskId = i;
Future<String> future = workerExecutor.submit(() -> {
System.out.println("Batch#" + batchId + " - Processing task " + taskId);
Thread.sleep(1000 + (int)(Math.random() * 2000));
return "Batch#" + batchId + "-Task" + taskId + " completed";
});
batchFutures.add(future);
}
// Wait for batch completion
try {
for (Future<String> future : batchFutures) {
String result = future.get();
System.out.println("Batch#" + batchId + " - " + result);
}
System.out.println("Batch job #" + batchId + " completed");
batchLatch.countDown();
} catch (Exception e) {
System.out.println("Batch job #" + batchId + " failed: " + e.getMessage());
}
}, 0, 10, TimeUnit.SECONDS); // Run every 10 seconds
// Wait for 3 batches to complete
batchLatch.await();
// Cancel further batches
batchJob.cancel(false);
scheduler.shutdown();
workerExecutor.shutdown();
System.out.println("Batch job processing completed");
}
}

Summary

Key ExecutorService Types:

  1. FixedThreadPool: Fixed number of threads
  2. CachedThreadPool: Dynamically grows and shrinks
  3. SingleThreadExecutor: Single thread for sequential execution
  4. ScheduledThreadPool: For delayed and periodic tasks
  5. WorkStealingPool: ForkJoinPool for parallel processing

Important Methods:

  1. execute(Runnable): Submit task without return value
  2. submit(Callable/Runnable): Submit task and return Future
  3. invokeAll(): Submit multiple tasks and wait for all
  4. invokeAny(): Submit multiple tasks and return first completed
  5. shutdown(): Graceful shutdown
  6. shutdownNow(): Forceful shutdown

Best Practices:

  1. Always shutdown ExecutorService to prevent resource leaks
  2. Use appropriate thread pool size based on task characteristics
  3. Handle RejectedExecutionException with proper policies
  4. Use Futures for task results and exception handling
  5. Monitor thread pool metrics for performance tuning
  6. Use CompletableFuture for complex asynchronous workflows
  7. Choose appropriate queue types based on requirements

Common Patterns:

  1. Producer-Consumer: Thread pool as consumer, task queue as buffer
  2. Pipeline Processing: Multiple executors for different stages
  3. Fan-out/Fan-in: Submit multiple tasks, aggregate results
  4. Periodic Tasks: ScheduledExecutorService for timers and heartbeats
  5. Resource Pooling: Reuse threads for better performance

ExecutorService provides a robust, efficient way to manage concurrent tasks in Java, making it the preferred approach over manual thread management in most scenarios.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper