Comprehensive guide to using Semaphore for controlling access to limited resources and implementing resource pools.
1. Semaphore Basics
Basic Semaphore Usage
import java.util.concurrent.Semaphore;
public class BasicSemaphoreExample {
public static void main(String[] args) {
// Create a semaphore with 3 permits
Semaphore semaphore = new Semaphore(3);
// Create multiple threads that need to acquire permits
for (int i = 1; i <= 10; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("Thread " + threadId + " waiting for permit");
semaphore.acquire(); // Block until permit is available
System.out.println("Thread " + threadId + " acquired permit");
// Simulate work with the resource
Thread.sleep(2000);
System.out.println("Thread " + threadId + " releasing permit");
semaphore.release();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// Stagger thread creation
try { Thread.sleep(200); } catch (InterruptedException e) {}
}
}
}
Semaphore with Try-Acquire
public class TryAcquireSemaphore {
private final Semaphore semaphore;
public TryAcquireSemaphore(int permits) {
this.semaphore = new Semaphore(permits);
}
public boolean tryAccessResource() {
// Non-blocking attempt to acquire permit
if (semaphore.tryAcquire()) {
try {
System.out.println(Thread.currentThread().getName() + " acquired permit");
// Use the resource
return true;
} finally {
semaphore.release();
}
} else {
System.out.println(Thread.currentThread().getName() + " could not acquire permit");
return false;
}
}
public boolean tryAccessResourceWithTimeout(long timeout, TimeUnit unit)
throws InterruptedException {
// Try to acquire with timeout
if (semaphore.tryAcquire(timeout, unit)) {
try {
System.out.println(Thread.currentThread().getName() + " acquired permit after waiting");
// Use the resource
return true;
} finally {
semaphore.release();
}
} else {
System.out.println(Thread.currentThread().getName() + " timeout waiting for permit");
return false;
}
}
public void demonstrateTryAcquire() throws InterruptedException {
Semaphore sem = new Semaphore(2);
// Acquire both permits
sem.acquire(2);
Thread thread = new Thread(() -> {
boolean acquired = sem.tryAcquire();
System.out.println("Try acquire result: " + acquired);
try {
boolean acquiredWithTimeout = sem.tryAcquire(1, TimeUnit.SECONDS);
System.out.println("Try acquire with timeout result: " + acquiredWithTimeout);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
thread.start();
Thread.sleep(500);
// Release one permit
sem.release();
thread.join();
}
}
2. Database Connection Pool
Basic Connection Pool with Semaphore
public class DatabaseConnectionPool {
private final Semaphore semaphore;
private final List<Connection> availableConnections;
private final List<Connection> usedConnections;
private final int poolSize;
public DatabaseConnectionPool(int poolSize) {
this.poolSize = poolSize;
this.semaphore = new Semaphore(poolSize, true); // Fair semaphore
this.availableConnections = new ArrayList<>(poolSize);
this.usedConnections = new ArrayList<>(poolSize);
initializePool();
}
private void initializePool() {
for (int i = 0; i < poolSize; i++) {
availableConnections.add(createConnection());
}
System.out.println("Initialized connection pool with " + poolSize + " connections");
}
public Connection getConnection() throws InterruptedException {
semaphore.acquire(); // Wait for available permit
return getAvailableConnection();
}
public Connection getConnection(long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
if (semaphore.tryAcquire(timeout, unit)) {
return getAvailableConnection();
} else {
throw new TimeoutException("Timeout waiting for database connection");
}
}
private synchronized Connection getAvailableConnection() {
Connection connection = availableConnections.remove(availableConnections.size() - 1);
usedConnections.add(connection);
System.out.println("Connection acquired. Available: " + availableConnections.size() +
", Used: " + usedConnections.size());
return connection;
}
public synchronized void releaseConnection(Connection connection) {
if (usedConnections.remove(connection)) {
availableConnections.add(connection);
semaphore.release(); // Release the permit
System.out.println("Connection released. Available: " + availableConnections.size() +
", Used: " + usedConnections.size());
}
}
public synchronized void closeAllConnections() {
for (Connection conn : availableConnections) {
closeConnection(conn);
}
for (Connection conn : usedConnections) {
closeConnection(conn);
}
availableConnections.clear();
usedConnections.clear();
}
public synchronized int getAvailableConnectionsCount() {
return availableConnections.size();
}
public synchronized int getUsedConnectionsCount() {
return usedConnections.size();
}
// Mock connection implementation
private Connection createConnection() {
return new MockConnection();
}
private void closeConnection(Connection connection) {
// Close the connection
System.out.println("Closing connection: " + connection);
}
// Mock Connection class
static class MockConnection implements Connection {
private final String id = UUID.randomUUID().toString().substring(0, 8);
@Override
public String toString() {
return "Connection-" + id;
}
// Implement other Connection methods as needed...
@Override public void close() {}
@Override public boolean isClosed() { return false; }
// ... other Connection interface methods
}
// Demonstration
public static void main(String[] args) throws Exception {
DatabaseConnectionPool pool = new DatabaseConnectionPool(3);
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 1; i <= 10; i++) {
final int taskId = i;
executor.submit(() -> {
try {
System.out.println("Task " + taskId + " waiting for connection");
Connection conn = pool.getConnection();
try {
System.out.println("Task " + taskId + " using connection: " + conn);
// Simulate database operation
Thread.sleep(2000);
System.out.println("Task " + taskId + " completed work");
} finally {
pool.releaseConnection(conn);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread.sleep(500); // Stagger task submission
}
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
pool.closeAllConnections();
}
}
3. Advanced Connection Pool with Validation
Robust Connection Pool
public class RobustConnectionPool {
private final Semaphore semaphore;
private final BlockingQueue<PooledConnection> availableConnections;
private final Set<PooledConnection> usedConnections;
private final int maxPoolSize;
private final ConnectionFactory connectionFactory;
private volatile boolean closed = false;
public RobustConnectionPool(int maxPoolSize, ConnectionFactory connectionFactory) {
this.maxPoolSize = maxPoolSize;
this.connectionFactory = connectionFactory;
this.semaphore = new Semaphore(maxPoolSize, true);
this.availableConnections = new LinkedBlockingQueue<>(maxPoolSize);
this.usedConnections = new ConcurrentHashSet<>();
initializePool();
}
private void initializePool() {
for (int i = 0; i < Math.min(2, maxPoolSize); i++) { // Start with 2 connections
createAndAddConnection();
}
}
private void createAndAddConnection() {
try {
Connection rawConnection = connectionFactory.createConnection();
PooledConnection pooledConnection = new PooledConnection(rawConnection);
availableConnections.offer(pooledConnection);
} catch (SQLException e) {
System.err.println("Failed to create connection: " + e.getMessage());
}
}
public Connection getConnection() throws SQLException, InterruptedException {
if (closed) {
throw new SQLException("Connection pool is closed");
}
semaphore.acquire();
return getAvailableConnection();
}
public Connection getConnection(long timeout, TimeUnit unit)
throws SQLException, InterruptedException, TimeoutException {
if (closed) {
throw new SQLException("Connection pool is closed");
}
if (!semaphore.tryAcquire(timeout, unit)) {
throw new TimeoutException("Timeout waiting for connection");
}
return getAvailableConnection();
}
private Connection getAvailableConnection() throws SQLException {
try {
PooledConnection connection = availableConnections.poll(1, TimeUnit.SECONDS);
if (connection != null && connection.isValid(1)) {
usedConnections.add(connection);
return connection;
} else {
// Create new connection if none available or existing is invalid
Connection newConnection = connectionFactory.createConnection();
PooledConnection pooledConnection = new PooledConnection(newConnection);
usedConnections.add(pooledConnection);
return pooledConnection;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SQLException("Interrupted while waiting for connection", e);
} catch (SQLException e) {
semaphore.release(); // Release permit if connection creation failed
throw e;
}
}
private void returnConnection(PooledConnection connection) {
if (usedConnections.remove(connection)) {
if (connection.isValid(1) && availableConnections.size() < maxPoolSize) {
availableConnections.offer(connection);
} else {
closeConnectionQuietly(connection.getRealConnection());
// Create replacement connection if pool is not full
if (availableConnections.size() < maxPoolSize && !closed) {
createAndAddConnection();
}
}
semaphore.release();
}
}
public void close() {
closed = true;
// Close all connections
availableConnections.forEach(conn ->
closeConnectionQuietly(conn.getRealConnection()));
availableConnections.clear();
usedConnections.forEach(conn ->
closeConnectionQuietly(conn.getRealConnection()));
usedConnections.clear();
}
private void closeConnectionQuietly(Connection connection) {
try {
if (!connection.isClosed()) {
connection.close();
}
} catch (SQLException e) {
// Log but don't throw
System.err.println("Error closing connection: " + e.getMessage());
}
}
public int getAvailableCount() {
return availableConnections.size();
}
public int getUsedCount() {
return usedConnections.size();
}
public int getTotalCount() {
return getAvailableCount() + getUsedCount();
}
// PooledConnection wrapper
private class PooledConnection implements Connection {
private final Connection realConnection;
private volatile boolean isClosed = false;
public PooledConnection(Connection realConnection) {
this.realConnection = realConnection;
}
public Connection getRealConnection() {
return realConnection;
}
@Override
public void close() throws SQLException {
if (!isClosed) {
isClosed = true;
returnConnection(this);
}
}
public boolean isValid(int timeout) {
try {
return !realConnection.isClosed() && realConnection.isValid(timeout);
} catch (SQLException e) {
return false;
}
}
// Delegate all other Connection methods to realConnection
@Override public Statement createStatement() throws SQLException {
return realConnection.createStatement();
}
@Override public PreparedStatement prepareStatement(String sql) throws SQLException {
return realConnection.prepareStatement(sql);
}
// ... implement all other Connection interface methods
}
// Connection factory interface
public interface ConnectionFactory {
Connection createConnection() throws SQLException;
}
}
4. Thread Pool with Semaphore Control
Bounded Thread Pool
public class BoundedThreadPool {
private final ExecutorService executor;
private final Semaphore semaphore;
private final int maxConcurrentTasks;
public BoundedThreadPool(int poolSize, int maxConcurrentTasks) {
this.executor = Executors.newFixedThreadPool(poolSize);
this.semaphore = new Semaphore(maxConcurrentTasks);
this.maxConcurrentTasks = maxConcurrentTasks;
}
public <T> Future<T> submit(Callable<T> task) {
return executor.submit(() -> {
semaphore.acquire();
try {
return task.call();
} finally {
semaphore.release();
}
});
}
public Future<?> submit(Runnable task) {
return executor.submit(() -> {
semaphore.acquire();
try {
task.run();
} finally {
semaphore.release();
}
});
}
public void shutdown() {
executor.shutdown();
}
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return executor.awaitTermination(timeout, unit);
}
public int getAvailablePermits() {
return semaphore.availablePermits();
}
public int getQueueLength() {
return semaphore.getQueueLength();
}
// Demonstration
public static void main(String[] args) throws Exception {
BoundedThreadPool boundedPool = new BoundedThreadPool(10, 3); // Max 3 concurrent tasks
List<Future<String>> futures = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
final int taskId = i;
Future<String> future = boundedPool.submit(() -> {
System.out.println("Task " + taskId + " started. Available permits: " +
boundedPool.getAvailablePermits());
Thread.sleep(3000); // Simulate work
return "Result from task " + taskId;
});
futures.add(future);
Thread.sleep(500); // Stagger submission
}
// Collect results
for (Future<String> future : futures) {
System.out.println("Completed: " + future.get());
}
boundedPool.shutdown();
boundedPool.awaitTermination(1, TimeUnit.MINUTES);
}
}
5. Rate Limiting with Semaphore
API Rate Limiter
public class RateLimiter {
private final Semaphore semaphore;
private final int maxPermits;
private final ScheduledExecutorService scheduler;
public RateLimiter(int permitsPerSecond) {
this.maxPermits = permitsPerSecond;
this.semaphore = new Semaphore(permitsPerSecond);
this.scheduler = Executors.newScheduledThreadPool(1);
// Schedule permit replenishment
scheduler.scheduleAtFixedRate(() -> {
int currentPermits = semaphore.availablePermits();
int permitsToRelease = maxPermits - currentPermits;
if (permitsToRelease > 0) {
semaphore.release(permitsToRelease);
}
}, 1, 1, TimeUnit.SECONDS);
}
public boolean tryAcquire() {
return semaphore.tryAcquire();
}
public void acquire() throws InterruptedException {
semaphore.acquire();
}
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
return semaphore.tryAcquire(timeout, unit);
}
public void shutdown() {
scheduler.shutdown();
}
// Demonstration
public static void main(String[] args) throws Exception {
RateLimiter rateLimiter = new RateLimiter(5); // 5 requests per second
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 1; i <= 20; i++) {
final int requestId = i;
executor.submit(() -> {
if (rateLimiter.tryAcquire()) {
System.out.println("Request " + requestId + " allowed at " +
System.currentTimeMillis());
// Process request
} else {
System.out.println("Request " + requestId + " rate limited");
}
});
Thread.sleep(100); // Stagger requests
}
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
rateLimiter.shutdown();
}
}
Token Bucket Rate Limiter
public class TokenBucketRateLimiter {
private final Semaphore semaphore;
private final int capacity;
private final ScheduledExecutorService refillScheduler;
public TokenBucketRateLimiter(int capacity, int refillRate, TimeUnit refillUnit) {
this.capacity = capacity;
this.semaphore = new Semaphore(capacity);
this.refillScheduler = Executors.newScheduledThreadPool(1);
// Calculate refill period
long refillPeriod = refillUnit.toNanos(1) / refillRate;
// Schedule token refill
refillScheduler.scheduleAtFixedRate(() -> {
if (semaphore.availablePermits() < capacity) {
semaphore.release();
}
}, refillPeriod, refillPeriod, TimeUnit.NANOSECONDS);
}
public boolean tryConsume() {
return semaphore.tryAcquire();
}
public void consume() throws InterruptedException {
semaphore.acquire();
}
public boolean tryConsume(int permits) {
return semaphore.tryAcquire(permits);
}
public void consume(int permits) throws InterruptedException {
semaphore.acquire(permits);
}
public int getAvailableTokens() {
return semaphore.availablePermits();
}
public void shutdown() {
refillScheduler.shutdown();
}
}
6. Resource Pool for Expensive Objects
Object Pool with Semaphore
public class ObjectPool<T> {
private final Semaphore semaphore;
private final BlockingQueue<T> availableObjects;
private final Set<T> usedObjects;
private final ObjectFactory<T> objectFactory;
private final int maxSize;
private volatile boolean closed = false;
public ObjectPool(int maxSize, ObjectFactory<T> objectFactory) {
this.maxSize = maxSize;
this.objectFactory = objectFactory;
this.semaphore = new Semaphore(maxSize, true);
this.availableObjects = new LinkedBlockingQueue<>(maxSize);
this.usedObjects = ConcurrentHashMap.newKeySet();
initializePool();
}
private void initializePool() {
for (int i = 0; i < Math.min(3, maxSize); i++) {
createAndAddObject();
}
}
private void createAndAddObject() {
try {
T object = objectFactory.create();
availableObjects.offer(object);
} catch (Exception e) {
System.err.println("Failed to create object: " + e.getMessage());
}
}
public T acquire() throws InterruptedException {
if (closed) {
throw new IllegalStateException("Pool is closed");
}
semaphore.acquire();
return acquireObject();
}
public T acquire(long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
if (closed) {
throw new IllegalStateException("Pool is closed");
}
if (!semaphore.tryAcquire(timeout, unit)) {
throw new TimeoutException("Timeout waiting for object");
}
return acquireObject();
}
private T acquireObject() {
try {
T object = availableObjects.poll(1, TimeUnit.SECONDS);
if (object != null && objectFactory.validate(object)) {
usedObjects.add(object);
return object;
} else {
// Create new object
T newObject = objectFactory.create();
usedObjects.add(newObject);
return newObject;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
semaphore.release(); // Release permit on interruption
throw new RuntimeException("Interrupted while acquiring object", e);
} catch (Exception e) {
semaphore.release(); // Release permit on creation failure
throw new RuntimeException("Failed to acquire object", e);
}
}
public void release(T object) {
if (usedObjects.remove(object)) {
if (objectFactory.validate(object) && availableObjects.size() < maxSize) {
availableObjects.offer(object);
} else {
objectFactory.destroy(object);
if (!closed && availableObjects.size() < maxSize) {
createAndAddObject();
}
}
semaphore.release();
}
}
public void close() {
closed = true;
// Destroy all objects
availableObjects.forEach(objectFactory::destroy);
availableObjects.clear();
usedObjects.forEach(objectFactory::destroy);
usedObjects.clear();
}
public int getAvailableCount() {
return availableObjects.size();
}
public int getUsedCount() {
return usedObjects.size();
}
// Object factory interface
public interface ObjectFactory<T> {
T create();
boolean validate(T object);
void destroy(T object);
}
// Example: Expensive parser pool
public static class ParserPool extends ObjectPool<DocumentBuilder> {
public ParserPool(int maxSize) {
super(maxSize, new ParserFactory());
}
public String parseXml(String xml) throws Exception {
DocumentBuilder parser = acquire();
try {
Document doc = parser.parse(new InputSource(new StringReader(xml)));
return doc.getDocumentElement().getTextContent();
} finally {
release(parser);
}
}
}
static class ParserFactory implements ObjectFactory<DocumentBuilder> {
@Override
public DocumentBuilder create() {
try {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
return factory.newDocumentBuilder();
} catch (ParserConfigurationException e) {
throw new RuntimeException("Failed to create parser", e);
}
}
@Override
public boolean validate(DocumentBuilder parser) {
return parser != null;
}
@Override
public void destroy(DocumentBuilder parser) {
// Parser doesn't need explicit cleanup
}
}
}
7. Producer-Consumer with Bounded Buffer
Bounded Buffer with Semaphore
public class BoundedBuffer<T> {
private final Semaphore emptySlots;
private final Semaphore fullSlots;
private final Queue<T> buffer;
private final int capacity;
public BoundedBuffer(int capacity) {
this.capacity = capacity;
this.emptySlots = new Semaphore(capacity);
this.fullSlots = new Semaphore(0);
this.buffer = new LinkedList<>();
}
public void put(T item) throws InterruptedException {
emptySlots.acquire(); // Wait for empty slot
synchronized (this) {
buffer.offer(item);
}
fullSlots.release(); // Signal that a slot is full
}
public T take() throws InterruptedException {
fullSlots.acquire(); // Wait for full slot
T item;
synchronized (this) {
item = buffer.poll();
}
emptySlots.release(); // Signal that a slot is empty
return item;
}
public boolean offer(T item) {
if (emptySlots.tryAcquire()) {
synchronized (this) {
buffer.offer(item);
}
fullSlots.release();
return true;
}
return false;
}
public T poll() {
if (fullSlots.tryAcquire()) {
T item;
synchronized (this) {
item = buffer.poll();
}
emptySlots.release();
return item;
}
return null;
}
public int size() {
synchronized (this) {
return buffer.size();
}
}
public boolean isEmpty() {
synchronized (this) {
return buffer.isEmpty();
}
}
public boolean isFull() {
return emptySlots.availablePermits() == 0;
}
// Demonstration
public static void main(String[] args) throws Exception {
BoundedBuffer<Integer> buffer = new BoundedBuffer<>(5);
// Producer
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
buffer.put(i);
System.out.println("Produced: " + i + " (Buffer size: " + buffer.size() + ")");
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumer
Thread consumer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
Integer item = buffer.take();
System.out.println("Consumed: " + item + " (Buffer size: " + buffer.size() + ")");
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
producer.join();
consumer.join();
}
}
8. Best Practices and Common Patterns
Error Handling and Resource Management
public class SemaphoreBestPractices {
// ✅ GOOD: Always release in finally block
public void properResourceHandling(Semaphore semaphore) {
try {
semaphore.acquire();
// Use the resource
performCriticalOperation();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// Handle interruption
} finally {
semaphore.release();
}
}
// ✅ GOOD: Use try-with-resources pattern
public void withAutoCloseable(Semaphore semaphore) throws Exception {
try (SemaphoreResource resource = new SemaphoreResource(semaphore)) {
resource.use();
} // Semaphore automatically released
}
// ✅ GOOD: Handle timeouts gracefully
public void withTimeoutHandling(Semaphore semaphore) {
try {
if (semaphore.tryAcquire(1, TimeUnit.SECONDS)) {
try {
performTimeSensitiveOperation();
} finally {
semaphore.release();
}
} else {
handleTimeout();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
handleInterruption();
}
}
// ❌ BAD: Might not release on exception
public void badPractice(Semaphore semaphore) throws InterruptedException {
semaphore.acquire();
performRiskyOperation(); // If this throws, semaphore won't be released
semaphore.release();
}
// Helper class for auto-closeable pattern
static class SemaphoreResource implements AutoCloseable {
private final Semaphore semaphore;
private boolean acquired = false;
public SemaphoreResource(Semaphore semaphore) throws InterruptedException {
this.semaphore = semaphore;
this.semaphore.acquire();
this.acquired = true;
}
public void use() {
if (!acquired) {
throw new IllegalStateException("Resource not acquired");
}
// Use the resource
}
@Override
public void close() {
if (acquired) {
semaphore.release();
acquired = false;
}
}
}
private void performCriticalOperation() {
// Critical section
}
private void performTimeSensitiveOperation() {
// Time-sensitive operation
}
private void performRiskyOperation() {
// Operation that might throw exception
}
private void handleTimeout() {
System.out.println("Operation timed out");
}
private void handleInterruption() {
System.out.println("Operation was interrupted");
}
}
Key Benefits of Semaphore for Resource Pooling
| Feature | Benefit | Use Case |
|---|---|---|
| Access Control | Limits concurrent access | Database connections, API calls |
| Fairness | Prevents starvation | Critical resources |
| Timeout Support | Avoids indefinite blocking | Responsive applications |
| Monitoring | Track usage statistics | Capacity planning |
| Flexibility | Dynamic permit management | Adaptive resource allocation |
Semaphores provide a robust mechanism for controlling access to limited resources, making them ideal for implementing resource pools, rate limiters, and bounded buffers in concurrent applications.