Semaphore for Resource Pooling in Java

Comprehensive guide to using Semaphore for controlling access to limited resources and implementing resource pools.

1. Semaphore Basics

Basic Semaphore Usage

import java.util.concurrent.Semaphore;
public class BasicSemaphoreExample {
public static void main(String[] args) {
// Create a semaphore with 3 permits
Semaphore semaphore = new Semaphore(3);
// Create multiple threads that need to acquire permits
for (int i = 1; i <= 10; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("Thread " + threadId + " waiting for permit");
semaphore.acquire(); // Block until permit is available
System.out.println("Thread " + threadId + " acquired permit");
// Simulate work with the resource
Thread.sleep(2000);
System.out.println("Thread " + threadId + " releasing permit");
semaphore.release();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// Stagger thread creation
try { Thread.sleep(200); } catch (InterruptedException e) {}
}
}
}

Semaphore with Try-Acquire

public class TryAcquireSemaphore {
private final Semaphore semaphore;
public TryAcquireSemaphore(int permits) {
this.semaphore = new Semaphore(permits);
}
public boolean tryAccessResource() {
// Non-blocking attempt to acquire permit
if (semaphore.tryAcquire()) {
try {
System.out.println(Thread.currentThread().getName() + " acquired permit");
// Use the resource
return true;
} finally {
semaphore.release();
}
} else {
System.out.println(Thread.currentThread().getName() + " could not acquire permit");
return false;
}
}
public boolean tryAccessResourceWithTimeout(long timeout, TimeUnit unit) 
throws InterruptedException {
// Try to acquire with timeout
if (semaphore.tryAcquire(timeout, unit)) {
try {
System.out.println(Thread.currentThread().getName() + " acquired permit after waiting");
// Use the resource
return true;
} finally {
semaphore.release();
}
} else {
System.out.println(Thread.currentThread().getName() + " timeout waiting for permit");
return false;
}
}
public void demonstrateTryAcquire() throws InterruptedException {
Semaphore sem = new Semaphore(2);
// Acquire both permits
sem.acquire(2);
Thread thread = new Thread(() -> {
boolean acquired = sem.tryAcquire();
System.out.println("Try acquire result: " + acquired);
try {
boolean acquiredWithTimeout = sem.tryAcquire(1, TimeUnit.SECONDS);
System.out.println("Try acquire with timeout result: " + acquiredWithTimeout);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
thread.start();
Thread.sleep(500);
// Release one permit
sem.release();
thread.join();
}
}

2. Database Connection Pool

Basic Connection Pool with Semaphore

public class DatabaseConnectionPool {
private final Semaphore semaphore;
private final List<Connection> availableConnections;
private final List<Connection> usedConnections;
private final int poolSize;
public DatabaseConnectionPool(int poolSize) {
this.poolSize = poolSize;
this.semaphore = new Semaphore(poolSize, true); // Fair semaphore
this.availableConnections = new ArrayList<>(poolSize);
this.usedConnections = new ArrayList<>(poolSize);
initializePool();
}
private void initializePool() {
for (int i = 0; i < poolSize; i++) {
availableConnections.add(createConnection());
}
System.out.println("Initialized connection pool with " + poolSize + " connections");
}
public Connection getConnection() throws InterruptedException {
semaphore.acquire(); // Wait for available permit
return getAvailableConnection();
}
public Connection getConnection(long timeout, TimeUnit unit) 
throws InterruptedException, TimeoutException {
if (semaphore.tryAcquire(timeout, unit)) {
return getAvailableConnection();
} else {
throw new TimeoutException("Timeout waiting for database connection");
}
}
private synchronized Connection getAvailableConnection() {
Connection connection = availableConnections.remove(availableConnections.size() - 1);
usedConnections.add(connection);
System.out.println("Connection acquired. Available: " + availableConnections.size() + 
", Used: " + usedConnections.size());
return connection;
}
public synchronized void releaseConnection(Connection connection) {
if (usedConnections.remove(connection)) {
availableConnections.add(connection);
semaphore.release(); // Release the permit
System.out.println("Connection released. Available: " + availableConnections.size() + 
", Used: " + usedConnections.size());
}
}
public synchronized void closeAllConnections() {
for (Connection conn : availableConnections) {
closeConnection(conn);
}
for (Connection conn : usedConnections) {
closeConnection(conn);
}
availableConnections.clear();
usedConnections.clear();
}
public synchronized int getAvailableConnectionsCount() {
return availableConnections.size();
}
public synchronized int getUsedConnectionsCount() {
return usedConnections.size();
}
// Mock connection implementation
private Connection createConnection() {
return new MockConnection();
}
private void closeConnection(Connection connection) {
// Close the connection
System.out.println("Closing connection: " + connection);
}
// Mock Connection class
static class MockConnection implements Connection {
private final String id = UUID.randomUUID().toString().substring(0, 8);
@Override
public String toString() {
return "Connection-" + id;
}
// Implement other Connection methods as needed...
@Override public void close() {}
@Override public boolean isClosed() { return false; }
// ... other Connection interface methods
}
// Demonstration
public static void main(String[] args) throws Exception {
DatabaseConnectionPool pool = new DatabaseConnectionPool(3);
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 1; i <= 10; i++) {
final int taskId = i;
executor.submit(() -> {
try {
System.out.println("Task " + taskId + " waiting for connection");
Connection conn = pool.getConnection();
try {
System.out.println("Task " + taskId + " using connection: " + conn);
// Simulate database operation
Thread.sleep(2000);
System.out.println("Task " + taskId + " completed work");
} finally {
pool.releaseConnection(conn);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread.sleep(500); // Stagger task submission
}
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
pool.closeAllConnections();
}
}

3. Advanced Connection Pool with Validation

Robust Connection Pool

public class RobustConnectionPool {
private final Semaphore semaphore;
private final BlockingQueue<PooledConnection> availableConnections;
private final Set<PooledConnection> usedConnections;
private final int maxPoolSize;
private final ConnectionFactory connectionFactory;
private volatile boolean closed = false;
public RobustConnectionPool(int maxPoolSize, ConnectionFactory connectionFactory) {
this.maxPoolSize = maxPoolSize;
this.connectionFactory = connectionFactory;
this.semaphore = new Semaphore(maxPoolSize, true);
this.availableConnections = new LinkedBlockingQueue<>(maxPoolSize);
this.usedConnections = new ConcurrentHashSet<>();
initializePool();
}
private void initializePool() {
for (int i = 0; i < Math.min(2, maxPoolSize); i++) { // Start with 2 connections
createAndAddConnection();
}
}
private void createAndAddConnection() {
try {
Connection rawConnection = connectionFactory.createConnection();
PooledConnection pooledConnection = new PooledConnection(rawConnection);
availableConnections.offer(pooledConnection);
} catch (SQLException e) {
System.err.println("Failed to create connection: " + e.getMessage());
}
}
public Connection getConnection() throws SQLException, InterruptedException {
if (closed) {
throw new SQLException("Connection pool is closed");
}
semaphore.acquire();
return getAvailableConnection();
}
public Connection getConnection(long timeout, TimeUnit unit) 
throws SQLException, InterruptedException, TimeoutException {
if (closed) {
throw new SQLException("Connection pool is closed");
}
if (!semaphore.tryAcquire(timeout, unit)) {
throw new TimeoutException("Timeout waiting for connection");
}
return getAvailableConnection();
}
private Connection getAvailableConnection() throws SQLException {
try {
PooledConnection connection = availableConnections.poll(1, TimeUnit.SECONDS);
if (connection != null && connection.isValid(1)) {
usedConnections.add(connection);
return connection;
} else {
// Create new connection if none available or existing is invalid
Connection newConnection = connectionFactory.createConnection();
PooledConnection pooledConnection = new PooledConnection(newConnection);
usedConnections.add(pooledConnection);
return pooledConnection;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SQLException("Interrupted while waiting for connection", e);
} catch (SQLException e) {
semaphore.release(); // Release permit if connection creation failed
throw e;
}
}
private void returnConnection(PooledConnection connection) {
if (usedConnections.remove(connection)) {
if (connection.isValid(1) && availableConnections.size() < maxPoolSize) {
availableConnections.offer(connection);
} else {
closeConnectionQuietly(connection.getRealConnection());
// Create replacement connection if pool is not full
if (availableConnections.size() < maxPoolSize && !closed) {
createAndAddConnection();
}
}
semaphore.release();
}
}
public void close() {
closed = true;
// Close all connections
availableConnections.forEach(conn -> 
closeConnectionQuietly(conn.getRealConnection()));
availableConnections.clear();
usedConnections.forEach(conn -> 
closeConnectionQuietly(conn.getRealConnection()));
usedConnections.clear();
}
private void closeConnectionQuietly(Connection connection) {
try {
if (!connection.isClosed()) {
connection.close();
}
} catch (SQLException e) {
// Log but don't throw
System.err.println("Error closing connection: " + e.getMessage());
}
}
public int getAvailableCount() {
return availableConnections.size();
}
public int getUsedCount() {
return usedConnections.size();
}
public int getTotalCount() {
return getAvailableCount() + getUsedCount();
}
// PooledConnection wrapper
private class PooledConnection implements Connection {
private final Connection realConnection;
private volatile boolean isClosed = false;
public PooledConnection(Connection realConnection) {
this.realConnection = realConnection;
}
public Connection getRealConnection() {
return realConnection;
}
@Override
public void close() throws SQLException {
if (!isClosed) {
isClosed = true;
returnConnection(this);
}
}
public boolean isValid(int timeout) {
try {
return !realConnection.isClosed() && realConnection.isValid(timeout);
} catch (SQLException e) {
return false;
}
}
// Delegate all other Connection methods to realConnection
@Override public Statement createStatement() throws SQLException { 
return realConnection.createStatement(); 
}
@Override public PreparedStatement prepareStatement(String sql) throws SQLException { 
return realConnection.prepareStatement(sql); 
}
// ... implement all other Connection interface methods
}
// Connection factory interface
public interface ConnectionFactory {
Connection createConnection() throws SQLException;
}
}

4. Thread Pool with Semaphore Control

Bounded Thread Pool

public class BoundedThreadPool {
private final ExecutorService executor;
private final Semaphore semaphore;
private final int maxConcurrentTasks;
public BoundedThreadPool(int poolSize, int maxConcurrentTasks) {
this.executor = Executors.newFixedThreadPool(poolSize);
this.semaphore = new Semaphore(maxConcurrentTasks);
this.maxConcurrentTasks = maxConcurrentTasks;
}
public <T> Future<T> submit(Callable<T> task) {
return executor.submit(() -> {
semaphore.acquire();
try {
return task.call();
} finally {
semaphore.release();
}
});
}
public Future<?> submit(Runnable task) {
return executor.submit(() -> {
semaphore.acquire();
try {
task.run();
} finally {
semaphore.release();
}
});
}
public void shutdown() {
executor.shutdown();
}
public boolean awaitTermination(long timeout, TimeUnit unit) 
throws InterruptedException {
return executor.awaitTermination(timeout, unit);
}
public int getAvailablePermits() {
return semaphore.availablePermits();
}
public int getQueueLength() {
return semaphore.getQueueLength();
}
// Demonstration
public static void main(String[] args) throws Exception {
BoundedThreadPool boundedPool = new BoundedThreadPool(10, 3); // Max 3 concurrent tasks
List<Future<String>> futures = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
final int taskId = i;
Future<String> future = boundedPool.submit(() -> {
System.out.println("Task " + taskId + " started. Available permits: " + 
boundedPool.getAvailablePermits());
Thread.sleep(3000); // Simulate work
return "Result from task " + taskId;
});
futures.add(future);
Thread.sleep(500); // Stagger submission
}
// Collect results
for (Future<String> future : futures) {
System.out.println("Completed: " + future.get());
}
boundedPool.shutdown();
boundedPool.awaitTermination(1, TimeUnit.MINUTES);
}
}

5. Rate Limiting with Semaphore

API Rate Limiter

public class RateLimiter {
private final Semaphore semaphore;
private final int maxPermits;
private final ScheduledExecutorService scheduler;
public RateLimiter(int permitsPerSecond) {
this.maxPermits = permitsPerSecond;
this.semaphore = new Semaphore(permitsPerSecond);
this.scheduler = Executors.newScheduledThreadPool(1);
// Schedule permit replenishment
scheduler.scheduleAtFixedRate(() -> {
int currentPermits = semaphore.availablePermits();
int permitsToRelease = maxPermits - currentPermits;
if (permitsToRelease > 0) {
semaphore.release(permitsToRelease);
}
}, 1, 1, TimeUnit.SECONDS);
}
public boolean tryAcquire() {
return semaphore.tryAcquire();
}
public void acquire() throws InterruptedException {
semaphore.acquire();
}
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
return semaphore.tryAcquire(timeout, unit);
}
public void shutdown() {
scheduler.shutdown();
}
// Demonstration
public static void main(String[] args) throws Exception {
RateLimiter rateLimiter = new RateLimiter(5); // 5 requests per second
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 1; i <= 20; i++) {
final int requestId = i;
executor.submit(() -> {
if (rateLimiter.tryAcquire()) {
System.out.println("Request " + requestId + " allowed at " + 
System.currentTimeMillis());
// Process request
} else {
System.out.println("Request " + requestId + " rate limited");
}
});
Thread.sleep(100); // Stagger requests
}
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
rateLimiter.shutdown();
}
}

Token Bucket Rate Limiter

public class TokenBucketRateLimiter {
private final Semaphore semaphore;
private final int capacity;
private final ScheduledExecutorService refillScheduler;
public TokenBucketRateLimiter(int capacity, int refillRate, TimeUnit refillUnit) {
this.capacity = capacity;
this.semaphore = new Semaphore(capacity);
this.refillScheduler = Executors.newScheduledThreadPool(1);
// Calculate refill period
long refillPeriod = refillUnit.toNanos(1) / refillRate;
// Schedule token refill
refillScheduler.scheduleAtFixedRate(() -> {
if (semaphore.availablePermits() < capacity) {
semaphore.release();
}
}, refillPeriod, refillPeriod, TimeUnit.NANOSECONDS);
}
public boolean tryConsume() {
return semaphore.tryAcquire();
}
public void consume() throws InterruptedException {
semaphore.acquire();
}
public boolean tryConsume(int permits) {
return semaphore.tryAcquire(permits);
}
public void consume(int permits) throws InterruptedException {
semaphore.acquire(permits);
}
public int getAvailableTokens() {
return semaphore.availablePermits();
}
public void shutdown() {
refillScheduler.shutdown();
}
}

6. Resource Pool for Expensive Objects

Object Pool with Semaphore

public class ObjectPool<T> {
private final Semaphore semaphore;
private final BlockingQueue<T> availableObjects;
private final Set<T> usedObjects;
private final ObjectFactory<T> objectFactory;
private final int maxSize;
private volatile boolean closed = false;
public ObjectPool(int maxSize, ObjectFactory<T> objectFactory) {
this.maxSize = maxSize;
this.objectFactory = objectFactory;
this.semaphore = new Semaphore(maxSize, true);
this.availableObjects = new LinkedBlockingQueue<>(maxSize);
this.usedObjects = ConcurrentHashMap.newKeySet();
initializePool();
}
private void initializePool() {
for (int i = 0; i < Math.min(3, maxSize); i++) {
createAndAddObject();
}
}
private void createAndAddObject() {
try {
T object = objectFactory.create();
availableObjects.offer(object);
} catch (Exception e) {
System.err.println("Failed to create object: " + e.getMessage());
}
}
public T acquire() throws InterruptedException {
if (closed) {
throw new IllegalStateException("Pool is closed");
}
semaphore.acquire();
return acquireObject();
}
public T acquire(long timeout, TimeUnit unit) 
throws InterruptedException, TimeoutException {
if (closed) {
throw new IllegalStateException("Pool is closed");
}
if (!semaphore.tryAcquire(timeout, unit)) {
throw new TimeoutException("Timeout waiting for object");
}
return acquireObject();
}
private T acquireObject() {
try {
T object = availableObjects.poll(1, TimeUnit.SECONDS);
if (object != null && objectFactory.validate(object)) {
usedObjects.add(object);
return object;
} else {
// Create new object
T newObject = objectFactory.create();
usedObjects.add(newObject);
return newObject;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
semaphore.release(); // Release permit on interruption
throw new RuntimeException("Interrupted while acquiring object", e);
} catch (Exception e) {
semaphore.release(); // Release permit on creation failure
throw new RuntimeException("Failed to acquire object", e);
}
}
public void release(T object) {
if (usedObjects.remove(object)) {
if (objectFactory.validate(object) && availableObjects.size() < maxSize) {
availableObjects.offer(object);
} else {
objectFactory.destroy(object);
if (!closed && availableObjects.size() < maxSize) {
createAndAddObject();
}
}
semaphore.release();
}
}
public void close() {
closed = true;
// Destroy all objects
availableObjects.forEach(objectFactory::destroy);
availableObjects.clear();
usedObjects.forEach(objectFactory::destroy);
usedObjects.clear();
}
public int getAvailableCount() {
return availableObjects.size();
}
public int getUsedCount() {
return usedObjects.size();
}
// Object factory interface
public interface ObjectFactory<T> {
T create();
boolean validate(T object);
void destroy(T object);
}
// Example: Expensive parser pool
public static class ParserPool extends ObjectPool<DocumentBuilder> {
public ParserPool(int maxSize) {
super(maxSize, new ParserFactory());
}
public String parseXml(String xml) throws Exception {
DocumentBuilder parser = acquire();
try {
Document doc = parser.parse(new InputSource(new StringReader(xml)));
return doc.getDocumentElement().getTextContent();
} finally {
release(parser);
}
}
}
static class ParserFactory implements ObjectFactory<DocumentBuilder> {
@Override
public DocumentBuilder create() {
try {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
return factory.newDocumentBuilder();
} catch (ParserConfigurationException e) {
throw new RuntimeException("Failed to create parser", e);
}
}
@Override
public boolean validate(DocumentBuilder parser) {
return parser != null;
}
@Override
public void destroy(DocumentBuilder parser) {
// Parser doesn't need explicit cleanup
}
}
}

7. Producer-Consumer with Bounded Buffer

Bounded Buffer with Semaphore

public class BoundedBuffer<T> {
private final Semaphore emptySlots;
private final Semaphore fullSlots;
private final Queue<T> buffer;
private final int capacity;
public BoundedBuffer(int capacity) {
this.capacity = capacity;
this.emptySlots = new Semaphore(capacity);
this.fullSlots = new Semaphore(0);
this.buffer = new LinkedList<>();
}
public void put(T item) throws InterruptedException {
emptySlots.acquire(); // Wait for empty slot
synchronized (this) {
buffer.offer(item);
}
fullSlots.release(); // Signal that a slot is full
}
public T take() throws InterruptedException {
fullSlots.acquire(); // Wait for full slot
T item;
synchronized (this) {
item = buffer.poll();
}
emptySlots.release(); // Signal that a slot is empty
return item;
}
public boolean offer(T item) {
if (emptySlots.tryAcquire()) {
synchronized (this) {
buffer.offer(item);
}
fullSlots.release();
return true;
}
return false;
}
public T poll() {
if (fullSlots.tryAcquire()) {
T item;
synchronized (this) {
item = buffer.poll();
}
emptySlots.release();
return item;
}
return null;
}
public int size() {
synchronized (this) {
return buffer.size();
}
}
public boolean isEmpty() {
synchronized (this) {
return buffer.isEmpty();
}
}
public boolean isFull() {
return emptySlots.availablePermits() == 0;
}
// Demonstration
public static void main(String[] args) throws Exception {
BoundedBuffer<Integer> buffer = new BoundedBuffer<>(5);
// Producer
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
buffer.put(i);
System.out.println("Produced: " + i + " (Buffer size: " + buffer.size() + ")");
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumer
Thread consumer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
Integer item = buffer.take();
System.out.println("Consumed: " + item + " (Buffer size: " + buffer.size() + ")");
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
producer.join();
consumer.join();
}
}

8. Best Practices and Common Patterns

Error Handling and Resource Management

public class SemaphoreBestPractices {
// ✅ GOOD: Always release in finally block
public void properResourceHandling(Semaphore semaphore) {
try {
semaphore.acquire();
// Use the resource
performCriticalOperation();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// Handle interruption
} finally {
semaphore.release();
}
}
// ✅ GOOD: Use try-with-resources pattern
public void withAutoCloseable(Semaphore semaphore) throws Exception {
try (SemaphoreResource resource = new SemaphoreResource(semaphore)) {
resource.use();
} // Semaphore automatically released
}
// ✅ GOOD: Handle timeouts gracefully
public void withTimeoutHandling(Semaphore semaphore) {
try {
if (semaphore.tryAcquire(1, TimeUnit.SECONDS)) {
try {
performTimeSensitiveOperation();
} finally {
semaphore.release();
}
} else {
handleTimeout();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
handleInterruption();
}
}
// ❌ BAD: Might not release on exception
public void badPractice(Semaphore semaphore) throws InterruptedException {
semaphore.acquire();
performRiskyOperation(); // If this throws, semaphore won't be released
semaphore.release();
}
// Helper class for auto-closeable pattern
static class SemaphoreResource implements AutoCloseable {
private final Semaphore semaphore;
private boolean acquired = false;
public SemaphoreResource(Semaphore semaphore) throws InterruptedException {
this.semaphore = semaphore;
this.semaphore.acquire();
this.acquired = true;
}
public void use() {
if (!acquired) {
throw new IllegalStateException("Resource not acquired");
}
// Use the resource
}
@Override
public void close() {
if (acquired) {
semaphore.release();
acquired = false;
}
}
}
private void performCriticalOperation() {
// Critical section
}
private void performTimeSensitiveOperation() {
// Time-sensitive operation
}
private void performRiskyOperation() {
// Operation that might throw exception
}
private void handleTimeout() {
System.out.println("Operation timed out");
}
private void handleInterruption() {
System.out.println("Operation was interrupted");
}
}

Key Benefits of Semaphore for Resource Pooling

FeatureBenefitUse Case
Access ControlLimits concurrent accessDatabase connections, API calls
FairnessPrevents starvationCritical resources
Timeout SupportAvoids indefinite blockingResponsive applications
MonitoringTrack usage statisticsCapacity planning
FlexibilityDynamic permit managementAdaptive resource allocation

Semaphores provide a robust mechanism for controlling access to limited resources, making them ideal for implementing resource pools, rate limiters, and bounded buffers in concurrent applications.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper