ExecutorService provides a higher-level replacement for working with threads directly. It manages a pool of threads and provides methods for submitting tasks for execution.
1. Basic ExecutorService Usage
Creating Different Types of ExecutorServices
import java.util.concurrent.*;
import java.util.*;
public class BasicExecutorService {
static class Task implements Runnable {
private String taskName;
private int duration;
public Task(String name, int duration) {
this.taskName = name;
this.duration = duration;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() +
" executing " + taskName + " for " + duration + " seconds");
try {
// Simulate work
for (int i = 1; i <= duration; i++) {
System.out.println(Thread.currentThread().getName() +
" - " + taskName + " progress: " + i + "/" + duration);
Thread.sleep(1000);
}
System.out.println(Thread.currentThread().getName() +
" completed " + taskName);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() +
" interrupted while executing " + taskName);
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) throws InterruptedException {
System.out.println("=== Basic ExecutorService Usage ===");
// 1. Fixed Thread Pool
demonstrateFixedThreadPool();
// 2. Cached Thread Pool
demonstrateCachedThreadPool();
// 3. Single Thread Executor
demonstrateSingleThreadExecutor();
// 4. Scheduled Thread Pool
demonstrateScheduledThreadPool();
// 5. Custom Thread Pool
demonstrateCustomThreadPool();
}
public static void demonstrateFixedThreadPool() throws InterruptedException {
System.out.println("\n=== FixedThreadPool (3 threads) ===");
ExecutorService executor = Executors.newFixedThreadPool(3);
// Submit 6 tasks to 3-thread pool
for (int i = 1; i <= 6; i++) {
Task task = new Task("FixedPool-Task-" + i, 2);
executor.execute(task);
}
// Shutdown gracefully
executor.shutdown();
boolean terminated = executor.awaitTermination(10, TimeUnit.SECONDS);
System.out.println("FixedThreadPool terminated: " + terminated);
}
public static void demonstrateCachedThreadPool() throws InterruptedException {
System.out.println("\n=== CachedThreadPool ===");
ExecutorService executor = Executors.newCachedThreadPool();
// Submit tasks with varying durations
for (int i = 1; i <= 8; i++) {
int duration = 1 + (i % 3); // 1, 2, or 3 seconds
Task task = new Task("CachedPool-Task-" + i, duration);
executor.execute(task);
}
// Monitor thread creation
Thread.sleep(2000);
System.out.println("Submitted more tasks after delay...");
for (int i = 9; i <= 12; i++) {
Task task = new Task("CachedPool-Task-" + i, 1);
executor.execute(task);
}
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
}
public static void demonstrateSingleThreadExecutor() throws InterruptedException {
System.out.println("\n=== SingleThreadExecutor ===");
ExecutorService executor = Executors.newSingleThreadExecutor();
// Tasks will execute sequentially
for (int i = 1; i <= 5; i++) {
Task task = new Task("SingleThread-Task-" + i, 1);
executor.execute(task);
}
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
}
public static void demonstrateScheduledThreadPool() throws InterruptedException {
System.out.println("\n=== ScheduledThreadPool ===");
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
// Schedule one-time task
System.out.println("Scheduling one-time task in 2 seconds...");
ScheduledFuture<?> oneTimeFuture = scheduler.schedule(
() -> System.out.println("One-time task executed!"),
2, TimeUnit.SECONDS
);
// Schedule fixed-rate task
System.out.println("Scheduling fixed-rate task (every 3 seconds)...");
ScheduledFuture<?> fixedRateFuture = scheduler.scheduleAtFixedRate(
() -> System.out.println("Fixed-rate task executed at " + new Date()),
1, 3, TimeUnit.SECONDS
);
// Schedule fixed-delay task
System.out.println("Scheduling fixed-delay task (3 seconds between executions)...");
ScheduledFuture<?> fixedDelayFuture = scheduler.scheduleWithFixedDelay(
() -> {
System.out.println("Fixed-delay task started at " + new Date());
try {
Thread.sleep(1000); // Simulate work
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Fixed-delay task completed");
},
1, 3, TimeUnit.SECONDS
);
// Let it run for 10 seconds
Thread.sleep(10000);
// Cancel periodic tasks
fixedRateFuture.cancel(false);
fixedDelayFuture.cancel(false);
scheduler.shutdown();
scheduler.awaitTermination(5, TimeUnit.SECONDS);
}
public static void demonstrateCustomThreadPool() throws InterruptedException {
System.out.println("\n=== Custom ThreadPool ===");
// Create custom thread pool with specific parameters
ThreadPoolExecutor customExecutor = new ThreadPoolExecutor(
2, // core pool size
4, // maximum pool size
60, // keep-alive time
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2), // queue capacity
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy() // rejection policy
);
System.out.println("Custom ThreadPool created:");
System.out.println("Core threads: " + customExecutor.getCorePoolSize());
System.out.println("Max threads: " + customExecutor.getMaximumPoolSize());
System.out.println("Queue capacity: " + customExecutor.getQueue().remainingCapacity());
// Submit tasks
for (int i = 1; i <= 8; i++) {
final int taskId = i;
customExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName() +
" executing task " + taskId);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Small delay to see pool behavior
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
// Monitor pool stats
System.out.printf("Task %d submitted - Pool: %d, Active: %d, Queue: %d%n",
taskId,
customExecutor.getPoolSize(),
customExecutor.getActiveCount(),
customExecutor.getQueue().size());
}
customExecutor.shutdown();
customExecutor.awaitTermination(20, TimeUnit.SECONDS);
}
}
2. Submitting Tasks and Handling Results
Using submit() with Futures
import java.util.concurrent.*;
import java.util.*;
public class TaskSubmissionWithFutures {
static class ComputationTask implements Callable<Long> {
private String taskName;
private int number;
public ComputationTask(String name, int number) {
this.taskName = name;
this.number = number;
}
@Override
public Long call() throws Exception {
System.out.println(Thread.currentThread().getName() +
" starting computation for " + taskName);
// Simulate computation time
Thread.sleep(2000);
// Compute factorial
long result = 1;
for (int i = 2; i <= number; i++) {
result *= i;
}
System.out.println(Thread.currentThread().getName() +
" completed " + taskName + " = " + result);
return result;
}
}
static class FileProcessor implements Callable<String> {
private String fileName;
public FileProcessor(String fileName) {
this.fileName = fileName;
}
@Override
public String call() throws Exception {
System.out.println(Thread.currentThread().getName() +
" processing file: " + fileName);
// Simulate file processing
int processingTime = 1000 + (int)(Math.random() * 3000);
Thread.sleep(processingTime);
// Simulate occasional failure
if (Math.random() < 0.2) {
throw new RuntimeException("Failed to process file: " + fileName);
}
return "Processed: " + fileName + " in " + processingTime + "ms";
}
}
public static void main(String[] args) throws Exception {
System.out.println("=== Task Submission with Futures ===");
// 1. Basic Future usage
demonstrateBasicFutures();
// 2. Handling multiple Futures
demonstrateMultipleFutures();
// 3. Exception handling with Futures
demonstrateFutureExceptionHandling();
// 4. Using CompletableFuture
demonstrateCompletableFuture();
}
public static void demonstrateBasicFutures() throws Exception {
System.out.println("\n=== Basic Future Usage ===");
ExecutorService executor = Executors.newFixedThreadPool(2);
// Submit Callable tasks and get Futures
Future<Long> future1 = executor.submit(new ComputationTask("Factorial-5", 5));
Future<Long> future2 = executor.submit(new ComputationTask("Factorial-7", 7));
Future<Long> future3 = executor.submit(new ComputationTask("Factorial-10", 10));
System.out.println("Tasks submitted, checking status...");
// Check if tasks are done
System.out.println("Task1 done: " + future1.isDone());
System.out.println("Task2 done: " + future2.isDone());
System.out.println("Task3 done: " + future3.isDone());
// Get results (this will block until results are available)
System.out.println("Getting results...");
Long result1 = future1.get();
Long result2 = future2.get();
Long result3 = future3.get();
System.out.println("Results: " + result1 + ", " + result2 + ", " + result3);
System.out.println("All tasks completed: " +
(future1.isDone() && future2.isDone() && future3.isDone()));
executor.shutdown();
}
public static void demonstrateMultipleFutures() throws Exception {
System.out.println("\n=== Handling Multiple Futures ===");
ExecutorService executor = Executors.newFixedThreadPool(3);
// Submit multiple file processing tasks
List<Future<String>> futures = new ArrayList<>();
String[] files = {"file1.txt", "file2.log", "file3.dat", "file4.csv", "file5.xml"};
for (String file : files) {
Future<String> future = executor.submit(new FileProcessor(file));
futures.add(future);
}
System.out.println("All file processing tasks submitted");
// Process results as they complete
List<String> results = new ArrayList<>();
for (Future<String> future : futures) {
try {
// Get with timeout
String result = future.get(5, TimeUnit.SECONDS);
results.add(result);
System.out.println("Got result: " + result);
} catch (TimeoutException e) {
System.out.println("Task timed out, cancelling...");
future.cancel(true);
} catch (ExecutionException e) {
System.out.println("Task failed: " + e.getCause().getMessage());
}
}
System.out.println("Successfully processed " + results.size() + " files");
System.out.println("Results: " + results);
executor.shutdown();
}
public static void demonstrateFutureExceptionHandling() throws InterruptedException {
System.out.println("\n=== Future Exception Handling ===");
ExecutorService executor = Executors.newFixedThreadPool(2);
// Submit tasks that might fail
Future<String> successFuture = executor.submit(() -> {
Thread.sleep(1000);
return "Success result";
});
Future<String> failureFuture = executor.submit(() -> {
Thread.sleep(1000);
throw new IllegalArgumentException("Simulated task failure");
});
// Handle successful future
try {
String result = successFuture.get();
System.out.println("Success task result: " + result);
} catch (ExecutionException e) {
System.out.println("Success task failed: " + e.getCause().getMessage());
} catch (InterruptedException e) {
System.out.println("Interrupted while waiting for success task");
}
// Handle failed future
try {
String result = failureFuture.get();
System.out.println("Failure task result: " + result);
} catch (ExecutionException e) {
System.out.println("Failure task failed as expected: " + e.getCause().getMessage());
} catch (InterruptedException e) {
System.out.println("Interrupted while waiting for failure task");
}
// Check cancellation
Future<String> cancellableFuture = executor.submit(() -> {
Thread.sleep(5000);
return "This should be cancelled";
});
// Cancel after 1 second
Thread.sleep(1000);
boolean cancelled = cancellableFuture.cancel(true);
System.out.println("Task cancelled: " + cancelled);
System.out.println("Task is cancelled: " + cancellableFuture.isCancelled());
executor.shutdown();
}
public static void demonstrateCompletableFuture() throws Exception {
System.out.println("\n=== CompletableFuture ===");
// Using CompletableFuture for asynchronous programming
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " executing async task 1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Result from task 1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " executing async task 2");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Result from task 2";
});
// Chain futures
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {
return result1 + " + " + result2;
});
// Handle completion
combinedFuture.thenAccept(result -> {
System.out.println("Combined result: " + result);
});
// Wait for completion
String finalResult = combinedFuture.get(5, TimeUnit.SECONDS);
System.out.println("Final result: " + finalResult);
// Exception handling with CompletableFuture
CompletableFuture<String> exceptionalFuture = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Random failure");
}
return "Success";
});
CompletableFuture<String> handledFuture = exceptionalFuture.exceptionally(throwable -> {
return "Recovered from: " + throwable.getMessage();
});
System.out.println("Exception handling result: " + handledFuture.get());
}
}
3. Thread Pool Management and Monitoring
Advanced Thread Pool Management
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.*;
public class ThreadPoolManagement {
static class MonitoringTask implements Runnable {
private String taskName;
private int duration;
public MonitoringTask(String name, int duration) {
this.taskName = name;
this.duration = duration;
}
@Override
public void run() {
long startTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() +
" started " + taskName + " at " + new Date(startTime));
try {
// Simulate work
for (int i = 1; i <= duration; i++) {
System.out.println(Thread.currentThread().getName() +
" - " + taskName + " working... (" + i + "/" + duration + ")");
Thread.sleep(1000);
// Check for interruption
if (Thread.currentThread().isInterrupted()) {
System.out.println(Thread.currentThread().getName() +
" interrupted during " + taskName);
return;
}
}
long endTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() +
" completed " + taskName + " in " +
(endTime - startTime) + "ms");
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() +
" interrupted during " + taskName);
Thread.currentThread().interrupt();
}
}
}
static class ThreadPoolMonitor implements Runnable {
private ThreadPoolExecutor executor;
private AtomicBoolean shutdown;
public ThreadPoolMonitor(ThreadPoolExecutor executor, AtomicBoolean shutdown) {
this.executor = executor;
this.shutdown = shutdown;
}
@Override
public void run() {
while (!shutdown.get()) {
System.out.println("\n=== Thread Pool Statistics ===");
System.out.println("Pool Size: " + executor.getPoolSize());
System.out.println("Active Count: " + executor.getActiveCount());
System.out.println("Core Pool Size: " + executor.getCorePoolSize());
System.out.println("Maximum Pool Size: " + executor.getMaximumPoolSize());
System.out.println("Largest Pool Size: " + executor.getLargestPoolSize());
System.out.println("Task Count: " + executor.getTaskCount());
System.out.println("Completed Task Count: " + executor.getCompletedTaskCount());
System.out.println("Queue Size: " + executor.getQueue().size());
System.out.println("Queue Remaining Capacity: " + executor.getQueue().remainingCapacity());
System.out.println("============================\n");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
System.out.println("Monitor interrupted");
break;
}
}
System.out.println("ThreadPoolMonitor stopped");
}
}
public static void main(String[] args) throws Exception {
System.out.println("=== Thread Pool Management and Monitoring ===");
// 1. Thread Pool with Monitoring
demonstrateThreadPoolMonitoring();
// 2. Graceful Shutdown
demonstrateGracefulShutdown();
// 3. Custom Rejection Policies
demonstrateRejectionPolicies();
// 4. Dynamic Thread Pool Adjustment
demonstrateDynamicAdjustment();
}
public static void demonstrateThreadPoolMonitoring() throws InterruptedException {
System.out.println("\n=== Thread Pool Monitoring ===");
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 30, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(5),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
AtomicBoolean shutdown = new AtomicBoolean(false);
// Start monitoring
Thread monitorThread = new Thread(new ThreadPoolMonitor(executor, shutdown));
monitorThread.setDaemon(true);
monitorThread.start();
// Submit tasks
for (int i = 1; i <= 10; i++) {
try {
executor.execute(new MonitoringTask("Task-" + i, 3));
System.out.println("Submitted Task-" + i);
Thread.sleep(500); // Slow submission to see pool behavior
} catch (RejectedExecutionException e) {
System.out.println("Task-" + i + " rejected: " + e.getMessage());
}
}
// Let tasks complete
Thread.sleep(5000);
// Stop monitoring and shutdown
shutdown.set(true);
executor.shutdown();
boolean terminated = executor.awaitTermination(10, TimeUnit.SECONDS);
System.out.println("Executor terminated: " + terminated);
}
public static void demonstrateGracefulShutdown() throws InterruptedException {
System.out.println("\n=== Graceful Shutdown ===");
ExecutorService executor = Executors.newFixedThreadPool(3);
// Submit long-running tasks
for (int i = 1; i <= 6; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println(Thread.currentThread().getName() +
" starting long task " + taskId);
try {
Thread.sleep(5000); // Long task
System.out.println(Thread.currentThread().getName() +
" completed task " + taskId);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() +
" interrupted in task " + taskId);
}
});
}
// Wait a bit then shutdown
Thread.sleep(1000);
System.out.println("Initiating shutdown...");
// Step 1: Prevent new tasks
executor.shutdown();
// Step 2: Wait for existing tasks to complete
System.out.println("Waiting for tasks to complete...");
boolean completed = executor.awaitTermination(10, TimeUnit.SECONDS);
if (!completed) {
System.out.println("Timeout reached, forcing shutdown...");
// Step 3: Force shutdown if timeout reached
List<Runnable> unfinishedTasks = executor.shutdownNow();
System.out.println("Cancelled " + unfinishedTasks.size() + " tasks");
// Wait a bit more for threads to respond to interrupt
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
System.err.println("Executor did not terminate");
}
} else {
System.out.println("All tasks completed gracefully");
}
}
public static void demonstrateRejectionPolicies() throws InterruptedException {
System.out.println("\n=== Rejection Policies ===");
// Test different rejection policies
testRejectionPolicy("AbortPolicy", new ThreadPoolExecutor.AbortPolicy());
testRejectionPolicy("CallerRunsPolicy", new ThreadPoolExecutor.CallerRunsPolicy());
testRejectionPolicy("DiscardPolicy", new ThreadPoolExecutor.DiscardPolicy());
testRejectionPolicy("DiscardOldestPolicy", new ThreadPoolExecutor.DiscardOldestPolicy());
}
public static void testRejectionPolicy(String policyName,
RejectedExecutionHandler policy)
throws InterruptedException {
System.out.println("\nTesting " + policyName + ":");
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 1, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
policy
);
// Submit more tasks than pool can handle
for (int i = 1; i <= 5; i++) {
final int taskId = i;
try {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName() +
" executing task " + taskId);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
System.out.println("Task " + taskId + " submitted successfully");
} catch (RejectedExecutionException e) {
System.out.println("Task " + taskId + " rejected: " + e.getMessage());
}
Thread.sleep(100); // Small delay between submissions
}
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
}
public static void demonstrateDynamicAdjustment() throws InterruptedException {
System.out.println("\n=== Dynamic Thread Pool Adjustment ===");
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 5, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10)
);
System.out.println("Initial pool - Core: " + executor.getCorePoolSize() +
", Max: " + executor.getMaximumPoolSize());
// Submit initial tasks
for (int i = 1; i <= 8; i++) {
executor.execute(createShortTask("Initial-Task-" + i));
}
Thread.sleep(2000);
System.out.println("After initial load - Pool: " + executor.getPoolSize() +
", Active: " + executor.getActiveCount());
// Increase core pool size dynamically
System.out.println("Increasing core pool size to 4...");
executor.setCorePoolSize(4);
// Submit more tasks
for (int i = 9; i <= 12; i++) {
executor.execute(createShortTask("Additional-Task-" + i));
}
Thread.sleep(2000);
System.out.println("After adjustment - Pool: " + executor.getPoolSize() +
", Active: " + executor.getActiveCount());
// Decrease core pool size
System.out.println("Decreasing core pool size to 2...");
executor.setCorePoolSize(2);
Thread.sleep(5000); // Wait for keep-alive time
System.out.println("After decrease - Pool: " + executor.getPoolSize() +
", Active: " + executor.getActiveCount());
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
}
private static Runnable createShortTask(String name) {
return () -> {
System.out.println(Thread.currentThread().getName() + " executing " + name);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};
}
}
4. Real-World Examples
Web Server Simulation
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.net.*;
import java.io.*;
public class RealWorldExecutorExamples {
// Simulated web request handler
static class RequestHandler implements Runnable {
private final int requestId;
private final String clientIp;
private static final AtomicInteger requestCounter = new AtomicInteger(0);
public RequestHandler(String clientIp) {
this.requestId = requestCounter.incrementAndGet();
this.clientIp = clientIp;
}
@Override
public void run() {
String threadName = Thread.currentThread().getName();
long startTime = System.currentTimeMillis();
System.out.println("[" + new Date(startTime) + "] " +
threadName + " handling request #" + requestId + " from " + clientIp);
try {
// Simulate different types of requests
String requestType = simulateRequestType();
int processingTime = simulateProcessingTime(requestType);
System.out.println(threadName + " - Request #" + requestId +
" type: " + requestType + ", estimated time: " +
processingTime + "ms");
// Process request
Thread.sleep(processingTime);
long endTime = System.currentTimeMillis();
System.out.println("[" + new Date(endTime) + "] " +
threadName + " completed request #" + requestId +
" in " + (endTime - startTime) + "ms");
} catch (InterruptedException e) {
System.out.println(threadName + " interrupted while handling request #" + requestId);
Thread.currentThread().interrupt();
}
}
private String simulateRequestType() {
String[] types = {"GET", "POST", "PUT", "DELETE", "API_CALL"};
return types[(int)(Math.random() * types.length)];
}
private int simulateProcessingTime(String requestType) {
switch (requestType) {
case "GET": return 100 + (int)(Math.random() * 400);
case "POST": return 200 + (int)(Math.random() * 600);
case "PUT": return 300 + (int)(Math.random() * 700);
case "DELETE": return 150 + (int)(Math.random() * 500);
case "API_CALL": return 500 + (int)(Math.random() * 1000);
default: return 200;
}
}
}
// Web server simulation
static class WebServer {
private final ThreadPoolExecutor requestExecutor;
private final ScheduledExecutorService monitorExecutor;
private final AtomicInteger activeRequests;
private volatile boolean running;
public WebServer(int minThreads, int maxThreads, int queueCapacity) {
this.activeRequests = new AtomicInteger(0);
this.running = true;
// Create request processing thread pool
this.requestExecutor = new ThreadPoolExecutor(
minThreads, maxThreads, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueCapacity),
new ThreadFactory() {
private AtomicInteger threadCounter = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "RequestHandler-" + threadCounter.getAndIncrement());
thread.setDaemon(true);
return thread;
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
// Create monitoring service
this.monitorExecutor = Executors.newScheduledThreadPool(1);
startMonitoring();
}
public void handleRequest(String clientIp) {
if (!running) {
System.out.println("Server not accepting new requests");
return;
}
activeRequests.incrementAndGet();
RequestHandler handler = new RequestHandler(clientIp);
try {
requestExecutor.execute(() -> {
try {
handler.run();
} finally {
activeRequests.decrementAndGet();
}
});
} catch (RejectedExecutionException e) {
activeRequests.decrementAndGet();
System.out.println("Request rejected from " + clientIp + " - server overloaded");
}
}
private void startMonitoring() {
monitorExecutor.scheduleAtFixedRate(() -> {
System.out.println("\n=== Server Status ===");
System.out.println("Active Requests: " + activeRequests.get());
System.out.println("Thread Pool - Pool: " + requestExecutor.getPoolSize() +
", Active: " + requestExecutor.getActiveCount() +
", Queue: " + requestExecutor.getQueue().size());
System.out.println("Completed Tasks: " + requestExecutor.getCompletedTaskCount());
System.out.println("========================\n");
}, 1, 5, TimeUnit.SECONDS);
}
public void shutdown() {
System.out.println("Shutting down web server...");
running = false;
// Stop accepting new requests
monitorExecutor.shutdown();
// Graceful shutdown of request executor
requestExecutor.shutdown();
try {
if (!requestExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
System.out.println("Forcing shutdown...");
requestExecutor.shutdownNow();
}
} catch (InterruptedException e) {
requestExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("Web server shutdown complete");
}
}
public static void main(String[] args) throws InterruptedException {
System.out.println("=== Real-World ExecutorService Examples ===");
// 1. Web Server Simulation
demonstrateWebServer();
// 2. Data Processing Pipeline
demonstrateDataProcessingPipeline();
// 3. Batch Job Processing
demonstrateBatchJobProcessing();
}
public static void demonstrateWebServer() throws InterruptedException {
System.out.println("\n=== Web Server Simulation ===");
WebServer server = new WebServer(2, 5, 10);
// Simulate incoming requests
Thread requestSimulator = new Thread(() -> {
for (int i = 1; i <= 20; i++) {
String clientIp = "192.168.1." + (100 + i);
server.handleRequest(clientIp);
// Random delay between requests
try {
Thread.sleep(100 + (int)(Math.random() * 400));
} catch (InterruptedException e) {
break;
}
}
});
requestSimulator.start();
requestSimulator.join();
// Wait for requests to process
Thread.sleep(5000);
server.shutdown();
}
public static void demonstrateDataProcessingPipeline() throws InterruptedException {
System.out.println("\n=== Data Processing Pipeline ===");
ExecutorService extractionExecutor = Executors.newFixedThreadPool(2);
ExecutorService transformationExecutor = Executors.newFixedThreadPool(3);
ExecutorService loadingExecutor = Executors.newFixedThreadPool(2);
CompletionService<String> completionService =
new ExecutorCompletionService<>(loadingExecutor);
// Extract phase
List<Future<String>> extractionFutures = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
final int dataSourceId = i;
Future<String> extractionFuture = extractionExecutor.submit(() -> {
System.out.println("Extracting from source " + dataSourceId);
Thread.sleep(1000 + (int)(Math.random() * 2000));
return "Data from source " + dataSourceId;
});
extractionFutures.add(extractionFuture);
}
// Transform phase (depends on extraction)
List<Future<String>> transformationFutures = new ArrayList<>();
for (Future<String> extractionFuture : extractionFutures) {
Future<String> transformationFuture = transformationExecutor.submit(() -> {
String rawData = extractionFuture.get();
System.out.println("Transforming: " + rawData);
Thread.sleep(1500);
return rawData + " -> TRANSFORMED";
});
transformationFutures.add(transformationFuture);
}
// Load phase (depends on transformation)
for (Future<String> transformationFuture : transformationFutures) {
completionService.submit(() -> {
String transformedData = transformationFuture.get();
System.out.println("Loading: " + transformedData);
Thread.sleep(1000);
return "LOADED: " + transformedData;
});
}
// Collect results
System.out.println("Collecting pipeline results...");
for (int i = 0; i < transformationFutures.size(); i++) {
try {
Future<String> loadedFuture = completionService.take();
String result = loadedFuture.get();
System.out.println("Pipeline result: " + result);
} catch (ExecutionException e) {
System.out.println("Pipeline stage failed: " + e.getCause().getMessage());
}
}
// Shutdown executors
extractionExecutor.shutdown();
transformationExecutor.shutdown();
loadingExecutor.shutdown();
extractionExecutor.awaitTermination(10, TimeUnit.SECONDS);
transformationExecutor.awaitTermination(10, TimeUnit.SECONDS);
loadingExecutor.awaitTermination(10, TimeUnit.SECONDS);
System.out.println("Data processing pipeline completed");
}
public static void demonstrateBatchJobProcessing() throws InterruptedException {
System.out.println("\n=== Batch Job Processing ===");
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
ExecutorService workerExecutor = Executors.newFixedThreadPool(4);
AtomicInteger jobCounter = new AtomicInteger(0);
CountDownLatch batchLatch = new CountDownLatch(3); // Process 3 batches
// Schedule batch jobs
ScheduledFuture<?> batchJob = scheduler.scheduleAtFixedRate(() -> {
int batchId = jobCounter.incrementAndGet();
System.out.println("Starting batch job #" + batchId + " at " + new Date());
// Submit tasks for this batch
List<Future<String>> batchFutures = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
final int taskId = i;
Future<String> future = workerExecutor.submit(() -> {
System.out.println("Batch#" + batchId + " - Processing task " + taskId);
Thread.sleep(1000 + (int)(Math.random() * 2000));
return "Batch#" + batchId + "-Task" + taskId + " completed";
});
batchFutures.add(future);
}
// Wait for batch completion
try {
for (Future<String> future : batchFutures) {
String result = future.get();
System.out.println("Batch#" + batchId + " - " + result);
}
System.out.println("Batch job #" + batchId + " completed");
batchLatch.countDown();
} catch (Exception e) {
System.out.println("Batch job #" + batchId + " failed: " + e.getMessage());
}
}, 0, 10, TimeUnit.SECONDS); // Run every 10 seconds
// Wait for 3 batches to complete
batchLatch.await();
// Cancel further batches
batchJob.cancel(false);
scheduler.shutdown();
workerExecutor.shutdown();
System.out.println("Batch job processing completed");
}
}
Summary
Key ExecutorService Types:
- FixedThreadPool: Fixed number of threads
- CachedThreadPool: Dynamically grows and shrinks
- SingleThreadExecutor: Single thread for sequential execution
- ScheduledThreadPool: For delayed and periodic tasks
- WorkStealingPool: ForkJoinPool for parallel processing
Important Methods:
- execute(Runnable): Submit task without return value
- submit(Callable/Runnable): Submit task and return Future
- invokeAll(): Submit multiple tasks and wait for all
- invokeAny(): Submit multiple tasks and return first completed
- shutdown(): Graceful shutdown
- shutdownNow(): Forceful shutdown
Best Practices:
- Always shutdown ExecutorService to prevent resource leaks
- Use appropriate thread pool size based on task characteristics
- Handle RejectedExecutionException with proper policies
- Use Futures for task results and exception handling
- Monitor thread pool metrics for performance tuning
- Use CompletableFuture for complex asynchronous workflows
- Choose appropriate queue types based on requirements
Common Patterns:
- Producer-Consumer: Thread pool as consumer, task queue as buffer
- Pipeline Processing: Multiple executors for different stages
- Fan-out/Fan-in: Submit multiple tasks, aggregate results
- Periodic Tasks: ScheduledExecutorService for timers and heartbeats
- Resource Pooling: Reuse threads for better performance
ExecutorService provides a robust, efficient way to manage concurrent tasks in Java, making it the preferred approach over manual thread management in most scenarios.