Executor Framework in Java

The Executor Framework provides a high-level replacement for working with threads directly. It simplifies the execution of tasks in asynchronous mode.

1. Core Components

Executor Interface

import java.util.concurrent.Executor;
public class BasicExecutorExample {
public static void main(String[] args) {
Executor executor = new DirectExecutor(); // Custom implementation
executor.execute(() -> {
System.out.println("Task executed by: " + Thread.currentThread().getName());
});
}
}
class DirectExecutor implements Executor {
@Override
public void execute(Runnable command) {
command.run(); // Executes in current thread
}
}

2. ExecutorService - Most Commonly Used

Creating ExecutorService

import java.util.concurrent.*;
public class ExecutorServiceExample {
public static void main(String[] args) {
// Different ways to create ExecutorService
// Fixed thread pool
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
// Cached thread pool
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// Single thread executor
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
// Scheduled executor
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(3);
}
}

3. Practical Examples

Basic Task Execution

import java.util.concurrent.*;
public class BasicTaskExecution {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);
// Submit tasks
for (int i = 1; i <= 10; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("Executing task " + taskId + 
" by " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // Simulate work
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// Graceful shutdown
executor.shutdown();
}
}

Using Callable and Future

import java.util.concurrent.*;
import java.util.*;
public class CallableFutureExample {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(3);
// List to store futures
List<Future<String>> futures = new ArrayList<>();
// Submit Callable tasks
for (int i = 1; i <= 5; i++) {
final int taskId = i;
Future<String> future = executor.submit(() -> {
Thread.sleep(1000);
return "Result from task " + taskId + 
" executed by " + Thread.currentThread().getName();
});
futures.add(future);
}
// Retrieve results
for (Future<String> future : futures) {
try {
String result = future.get(2, TimeUnit.SECONDS);
System.out.println(result);
} catch (TimeoutException e) {
System.out.println("Task timed out");
} catch (Exception e) {
System.out.println("Task failed: " + e.getMessage());
}
}
executor.shutdown();
}
}

4. ScheduledExecutorService

Scheduled Tasks

import java.util.concurrent.*;
import java.time.LocalTime;
public class ScheduledExecutorExample {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
System.out.println("Current time: " + LocalTime.now());
// Schedule task to run after 2 seconds delay
ScheduledFuture<?> future = scheduler.schedule(() -> {
System.out.println("One-time task executed at: " + LocalTime.now());
}, 2, TimeUnit.SECONDS);
// Schedule task to run repeatedly every 3 seconds after initial 1 second delay
ScheduledFuture<?> periodicFuture = scheduler.scheduleAtFixedRate(() -> {
System.out.println("Periodic task executed at: " + LocalTime.now());
}, 1, 3, TimeUnit.SECONDS);
// Let it run for 10 seconds then cancel
Thread.sleep(10000);
periodicFuture.cancel(true);
scheduler.shutdown();
}
}

5. Advanced Examples

InvokeAll - Execute Multiple Tasks

import java.util.concurrent.*;
import java.util.*;
public class InvokeAllExample {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(4);
List<Callable<String>> tasks = Arrays.asList(
() -> { Thread.sleep(2000); return "Task 1 completed"; },
() -> { Thread.sleep(1000); return "Task 2 completed"; },
() -> { Thread.sleep(3000); return "Task 3 completed"; },
() -> { Thread.sleep(1500); return "Task 4 completed"; }
);
try {
List<Future<String>> futures = executor.invokeAll(tasks);
for (Future<String> future : futures) {
try {
System.out.println(future.get());
} catch (ExecutionException e) {
System.out.println("Task failed: " + e.getCause());
}
}
} finally {
executor.shutdown();
}
}
}

InvokeAny - Get First Completed Result

import java.util.concurrent.*;
import java.util.*;
public class InvokeAnyExample {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(4);
List<Callable<String>> tasks = Arrays.asList(
() -> { 
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
return "Fast Task";
},
() -> { 
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
return "Medium Task";
},
() -> { 
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
return "Slow Task";
}
);
String result = executor.invokeAny(tasks);
System.out.println("First completed: " + result);
executor.shutdown();
}
}

6. ThreadPoolExecutor - Custom Configuration

Custom Thread Pool

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class CustomThreadPoolExample {
public static void main(String[] args) {
// Custom thread factory
ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "CustomThread-" + counter.getAndIncrement());
thread.setDaemon(true);
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
};
// Custom thread pool with rejection policy
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // core pool size
5, // maximum pool size
60, // keep alive time
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10), // work queue
threadFactory,
new ThreadPoolExecutor.CallerRunsPolicy() // rejection policy
);
// Submit tasks
for (int i = 1; i <= 20; i++) {
final int taskId = i;
try {
executor.execute(() -> {
System.out.println("Task " + taskId + 
" executed by " + Thread.currentThread().getName());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
} catch (RejectedExecutionException e) {
System.out.println("Task " + taskId + " was rejected");
}
}
// Monitor thread pool
System.out.println("Pool size: " + executor.getPoolSize());
System.out.println("Active count: " + executor.getActiveCount());
System.out.println("Queue size: " + executor.getQueue().size());
// Graceful shutdown
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

7. CompletionService - Process Results as They Complete

import java.util.concurrent.*;
import java.util.*;
public class CompletionServiceExample {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
// Submit tasks
for (int i = 1; i <= 5; i++) {
final int taskId = i;
completionService.submit(() -> {
int sleepTime = ThreadLocalRandom.current().nextInt(1000, 5000);
Thread.sleep(sleepTime);
return "Task " + taskId + " completed after " + sleepTime + "ms";
});
}
// Process results as they complete
for (int i = 0; i < 5; i++) {
Future<String> completedFuture = completionService.take();
String result = completedFuture.get();
System.out.println("Received: " + result);
}
executor.shutdown();
}
}

8. Best Practices and Shutdown

Proper Shutdown Pattern

public class ProperShutdownExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);
try {
// Submit work
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
// Task implementation
});
}
} finally {
// Proper shutdown sequence
executor.shutdown(); // Disable new tasks
try {
// Wait for existing tasks to terminate
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow(); // Cancel currently executing tasks
// Wait again for tasks to respond to being cancelled
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Executor did not terminate");
}
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
}

Key Benefits of Executor Framework

  1. Thread Reuse - Reduces thread creation overhead
  2. Resource Management - Controls number of concurrent threads
  3. Task Submission - Simple API for submitting Runnable/Callable tasks
  4. Result Handling - Future objects for asynchronous result retrieval
  5. Lifecycle Management - Proper startup and shutdown mechanisms
  6. Exception Handling - Better error handling compared to raw threads

The Executor Framework is essential for building efficient, scalable concurrent applications in Java.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper