The Executor Framework provides a high-level replacement for working with threads directly. It simplifies the execution of tasks in asynchronous mode.
1. Core Components
Executor Interface
import java.util.concurrent.Executor;
public class BasicExecutorExample {
public static void main(String[] args) {
Executor executor = new DirectExecutor(); // Custom implementation
executor.execute(() -> {
System.out.println("Task executed by: " + Thread.currentThread().getName());
});
}
}
class DirectExecutor implements Executor {
@Override
public void execute(Runnable command) {
command.run(); // Executes in current thread
}
}
2. ExecutorService - Most Commonly Used
Creating ExecutorService
import java.util.concurrent.*;
public class ExecutorServiceExample {
public static void main(String[] args) {
// Different ways to create ExecutorService
// Fixed thread pool
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
// Cached thread pool
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// Single thread executor
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
// Scheduled executor
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(3);
}
}
3. Practical Examples
Basic Task Execution
import java.util.concurrent.*;
public class BasicTaskExecution {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);
// Submit tasks
for (int i = 1; i <= 10; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("Executing task " + taskId +
" by " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // Simulate work
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// Graceful shutdown
executor.shutdown();
}
}
Using Callable and Future
import java.util.concurrent.*;
import java.util.*;
public class CallableFutureExample {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(3);
// List to store futures
List<Future<String>> futures = new ArrayList<>();
// Submit Callable tasks
for (int i = 1; i <= 5; i++) {
final int taskId = i;
Future<String> future = executor.submit(() -> {
Thread.sleep(1000);
return "Result from task " + taskId +
" executed by " + Thread.currentThread().getName();
});
futures.add(future);
}
// Retrieve results
for (Future<String> future : futures) {
try {
String result = future.get(2, TimeUnit.SECONDS);
System.out.println(result);
} catch (TimeoutException e) {
System.out.println("Task timed out");
} catch (Exception e) {
System.out.println("Task failed: " + e.getMessage());
}
}
executor.shutdown();
}
}
4. ScheduledExecutorService
Scheduled Tasks
import java.util.concurrent.*;
import java.time.LocalTime;
public class ScheduledExecutorExample {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
System.out.println("Current time: " + LocalTime.now());
// Schedule task to run after 2 seconds delay
ScheduledFuture<?> future = scheduler.schedule(() -> {
System.out.println("One-time task executed at: " + LocalTime.now());
}, 2, TimeUnit.SECONDS);
// Schedule task to run repeatedly every 3 seconds after initial 1 second delay
ScheduledFuture<?> periodicFuture = scheduler.scheduleAtFixedRate(() -> {
System.out.println("Periodic task executed at: " + LocalTime.now());
}, 1, 3, TimeUnit.SECONDS);
// Let it run for 10 seconds then cancel
Thread.sleep(10000);
periodicFuture.cancel(true);
scheduler.shutdown();
}
}
5. Advanced Examples
InvokeAll - Execute Multiple Tasks
import java.util.concurrent.*;
import java.util.*;
public class InvokeAllExample {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(4);
List<Callable<String>> tasks = Arrays.asList(
() -> { Thread.sleep(2000); return "Task 1 completed"; },
() -> { Thread.sleep(1000); return "Task 2 completed"; },
() -> { Thread.sleep(3000); return "Task 3 completed"; },
() -> { Thread.sleep(1500); return "Task 4 completed"; }
);
try {
List<Future<String>> futures = executor.invokeAll(tasks);
for (Future<String> future : futures) {
try {
System.out.println(future.get());
} catch (ExecutionException e) {
System.out.println("Task failed: " + e.getCause());
}
}
} finally {
executor.shutdown();
}
}
}
InvokeAny - Get First Completed Result
import java.util.concurrent.*;
import java.util.*;
public class InvokeAnyExample {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(4);
List<Callable<String>> tasks = Arrays.asList(
() -> {
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
return "Fast Task";
},
() -> {
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
return "Medium Task";
},
() -> {
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
return "Slow Task";
}
);
String result = executor.invokeAny(tasks);
System.out.println("First completed: " + result);
executor.shutdown();
}
}
6. ThreadPoolExecutor - Custom Configuration
Custom Thread Pool
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class CustomThreadPoolExample {
public static void main(String[] args) {
// Custom thread factory
ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "CustomThread-" + counter.getAndIncrement());
thread.setDaemon(true);
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
};
// Custom thread pool with rejection policy
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // core pool size
5, // maximum pool size
60, // keep alive time
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10), // work queue
threadFactory,
new ThreadPoolExecutor.CallerRunsPolicy() // rejection policy
);
// Submit tasks
for (int i = 1; i <= 20; i++) {
final int taskId = i;
try {
executor.execute(() -> {
System.out.println("Task " + taskId +
" executed by " + Thread.currentThread().getName());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
} catch (RejectedExecutionException e) {
System.out.println("Task " + taskId + " was rejected");
}
}
// Monitor thread pool
System.out.println("Pool size: " + executor.getPoolSize());
System.out.println("Active count: " + executor.getActiveCount());
System.out.println("Queue size: " + executor.getQueue().size());
// Graceful shutdown
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
7. CompletionService - Process Results as They Complete
import java.util.concurrent.*;
import java.util.*;
public class CompletionServiceExample {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
// Submit tasks
for (int i = 1; i <= 5; i++) {
final int taskId = i;
completionService.submit(() -> {
int sleepTime = ThreadLocalRandom.current().nextInt(1000, 5000);
Thread.sleep(sleepTime);
return "Task " + taskId + " completed after " + sleepTime + "ms";
});
}
// Process results as they complete
for (int i = 0; i < 5; i++) {
Future<String> completedFuture = completionService.take();
String result = completedFuture.get();
System.out.println("Received: " + result);
}
executor.shutdown();
}
}
8. Best Practices and Shutdown
Proper Shutdown Pattern
public class ProperShutdownExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);
try {
// Submit work
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
// Task implementation
});
}
} finally {
// Proper shutdown sequence
executor.shutdown(); // Disable new tasks
try {
// Wait for existing tasks to terminate
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow(); // Cancel currently executing tasks
// Wait again for tasks to respond to being cancelled
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Executor did not terminate");
}
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
}
Key Benefits of Executor Framework
- Thread Reuse - Reduces thread creation overhead
- Resource Management - Controls number of concurrent threads
- Task Submission - Simple API for submitting Runnable/Callable tasks
- Result Handling - Future objects for asynchronous result retrieval
- Lifecycle Management - Proper startup and shutdown mechanisms
- Exception Handling - Better error handling compared to raw threads
The Executor Framework is essential for building efficient, scalable concurrent applications in Java.