Overview
Google Cloud Tasks is a fully managed service that allows you to manage the execution, dispatch, and delivery of a large number of distributed tasks. You can use it to perform work asynchronously outside of a user request.
Key Concepts
- Queue: Container for tasks
- Task: Unit of work to be executed
- Handler: Service that processes tasks
- HTTP Target: HTTP endpoint that receives tasks
- App Engine Target: App Engine service that receives tasks
Setup and Dependencies
1. Maven Dependencies
<properties>
<google.cloud.tasks.version>2.36.0</google.cloud.tasks.version>
</properties>
<dependencies>
<!-- Cloud Tasks Client -->
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-tasks</artifactId>
<version>${google.cloud.tasks.version}</version>
</dependency>
<!-- For HTTP client in task handlers -->
<dependency>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
<version>1.43.3</version>
</dependency>
<!-- For JSON processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<!-- For authentication -->
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
<version>1.19.0</version>
</dependency>
</dependencies>
2. Authentication Setup
import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.CloudTasksSettings;
import com.google.auth.oauth2.GoogleCredentials;
import java.io.FileInputStream;
import java.io.IOException;
public class CloudTasksConfig {
public static CloudTasksClient createClient() throws IOException {
// Method 1: Use default credentials (for GCP environments)
CloudTasksSettings settings = CloudTasksSettings.newBuilder().build();
return CloudTasksClient.create(settings);
}
public static CloudTasksClient createClientWithServiceAccount(String credentialsPath)
throws IOException {
// Method 2: Use service account credentials
GoogleCredentials credentials = GoogleCredentials
.fromStream(new FileInputStream(credentialsPath));
CloudTasksSettings settings = CloudTasksSettings.newBuilder()
.setCredentialsProvider(() -> credentials)
.build();
return CloudTasksClient.create(settings);
}
public static CloudTasksClient createClientWithImpersonation(
String targetServiceAccount) throws IOException {
// Method 3: Use credential impersonation
GoogleCredentials sourceCredentials = GoogleCredentials.getApplicationDefault();
GoogleCredentials impersonatedCredentials = sourceCredentials
.createDelegated(targetServiceAccount);
CloudTasksSettings settings = CloudTasksSettings.newBuilder()
.setCredentialsProvider(() -> impersonatedCredentials)
.build();
return CloudTasksClient.create(settings);
}
}
Basic Queue Management
1. Queue Operations
import com.google.cloud.tasks.v2.*;
import com.google.protobuf.Duration;
import com.google.protobuf.util.Durations;
public class QueueManager {
private final CloudTasksClient client;
private final String projectId;
private final String location;
public QueueManager(CloudTasksClient client, String projectId, String location) {
this.client = client;
this.projectId = projectId;
this.location = location;
}
// Create a queue
public Queue createQueue(String queueName, QueueConfig config) {
String queuePath = QueueName.of(projectId, location, queueName).toString();
Queue.Builder queueBuilder = Queue.newBuilder()
.setName(queuePath);
// Set retry config
if (config.getMaxAttempts() > 0) {
RetryConfig retryConfig = RetryConfig.newBuilder()
.setMaxAttempts(config.getMaxAttempts())
.setMaxRetryDuration(Durations.fromSeconds(config.getMaxRetryDurationSeconds()))
.setMinBackoff(Durations.fromSeconds(config.getMinBackoffSeconds()))
.setMaxBackoff(Durations.fromSeconds(config.getMaxBackoffSeconds()))
.build();
queueBuilder.setRetryConfig(retryConfig);
}
// Set rate limits
if (config.getMaxDispatchesPerSecond() > 0) {
RateLimits rateLimits = RateLimits.newBuilder()
.setMaxDispatchesPerSecond(config.getMaxDispatchesPerSecond())
.setMaxConcurrentDispatches(config.getMaxConcurrentDispatches())
.build();
queueBuilder.setRateLimits(rateLimits);
}
Queue queue = queueBuilder.build();
return client.createQueue(LocationName.of(projectId, location), queue);
}
// Get a queue
public Queue getQueue(String queueName) {
String queuePath = QueueName.of(projectId, location, queueName).toString();
return client.getQueue(queuePath);
}
// List all queues
public List<Queue> listQueues() {
List<Queue> queues = new ArrayList<>();
LocationName parent = LocationName.of(projectId, location);
for (Queue queue : client.listQueues(parent).iterateAll()) {
queues.add(queue);
}
return queues;
}
// Update a queue
public Queue updateQueue(String queueName, QueueConfig newConfig) {
Queue existingQueue = getQueue(queueName);
Queue.Builder updatedBuilder = existingQueue.toBuilder();
// Update retry config
RetryConfig retryConfig = RetryConfig.newBuilder()
.setMaxAttempts(newConfig.getMaxAttempts())
.setMaxRetryDuration(Durations.fromSeconds(newConfig.getMaxRetryDurationSeconds()))
.build();
updatedBuilder.setRetryConfig(retryConfig);
// Update rate limits
RateLimits rateLimits = RateLimits.newBuilder()
.setMaxDispatchesPerSecond(newConfig.getMaxDispatchesPerSecond())
.setMaxConcurrentDispatches(newConfig.getMaxConcurrentDispatches())
.build();
updatedBuilder.setRateLimits(rateLimits);
return client.updateQueue(updatedBuilder.build());
}
// Delete a queue
public void deleteQueue(String queueName) {
String queuePath = QueueName.of(projectId, location, queueName).toString();
client.deleteQueue(queuePath);
}
// Pause a queue
public Queue pauseQueue(String queueName) {
Queue queue = getQueue(queueName);
Queue pausedQueue = queue.toBuilder().setState(Queue.State.PAUSED).build();
return client.updateQueue(pausedQueue);
}
// Resume a queue
public Queue resumeQueue(String queueName) {
Queue queue = getQueue(queueName);
Queue resumedQueue = queue.toBuilder().setState(Queue.State.RUNNING).build();
return client.updateQueue(resumedQueue);
}
// Configuration class for queue settings
public static class QueueConfig {
private int maxAttempts = 10;
private long maxRetryDurationSeconds = 3600; // 1 hour
private double maxDispatchesPerSecond = 500.0;
private int maxConcurrentDispatches = 1000;
private long minBackoffSeconds = 0; // Immediate retry
private long maxBackoffSeconds = 3600; // 1 hour
// Getters and setters
public int getMaxAttempts() { return maxAttempts; }
public void setMaxAttempts(int maxAttempts) { this.maxAttempts = maxAttempts; }
public long getMaxRetryDurationSeconds() { return maxRetryDurationSeconds; }
public void setMaxRetryDurationSeconds(long maxRetryDurationSeconds) {
this.maxRetryDurationSeconds = maxRetryDurationSeconds;
}
public double getMaxDispatchesPerSecond() { return maxDispatchesPerSecond; }
public void setMaxDispatchesPerSecond(double maxDispatchesPerSecond) {
this.maxDispatchesPerSecond = maxDispatchesPerSecond;
}
public int getMaxConcurrentDispatches() { return maxConcurrentDispatches; }
public void setMaxConcurrentDispatches(int maxConcurrentDispatches) {
this.maxConcurrentDispatches = maxConcurrentDispatches;
}
public long getMinBackoffSeconds() { return minBackoffSeconds; }
public void setMinBackoffSeconds(long minBackoffSeconds) {
this.minBackoffSeconds = minBackoffSeconds;
}
public long getMaxBackoffSeconds() { return maxBackoffSeconds; }
public void setMaxBackoffSeconds(long maxBackoffSeconds) {
this.maxBackoffSeconds = maxBackoffSeconds;
}
}
}
Task Creation and Management
1. HTTP Task Creation
import com.google.cloud.tasks.v2.*;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Durations;
import com.google.protobuf.util.Timestamps;
public class TaskCreator {
private final CloudTasksClient client;
private final String projectId;
private final String location;
public TaskCreator(CloudTasksClient client, String projectId, String location) {
this.client = client;
this.projectId = projectId;
this.location = location;
}
// Create a simple HTTP task
public Task createHttpTask(String queueName, String url, String payload) {
return createHttpTask(queueName, url, payload, null, null);
}
// Create HTTP task with scheduling and headers
public Task createHttpTask(String queueName, String url, String payload,
Date scheduleTime, Map<String, String> headers) {
String queuePath = QueueName.of(projectId, location, queueName).toString();
// Build the task
Task.Builder taskBuilder = Task.newBuilder();
// Set schedule time if provided
if (scheduleTime != null) {
Timestamp timestamp = Timestamp.newBuilder()
.setSeconds(scheduleTime.getTime() / 1000)
.build();
taskBuilder.setScheduleTime(timestamp);
}
// Build HTTP request
HttpRequest.Builder httpRequestBuilder = HttpRequest.newBuilder()
.setUrl(url)
.setHttpMethod(HttpMethod.POST);
// Add payload
if (payload != null && !payload.isEmpty()) {
httpRequestBuilder.setBody(ByteString.copyFromUtf8(payload));
}
// Add headers
if (headers != null) {
for (Map.Entry<String, String> header : headers.entrySet()) {
httpRequestBuilder.putHeaders(header.getKey(), header.getValue());
}
}
// Add content-type header if not present
if (!httpRequestBuilder.getHeadersMap().containsKey("Content-Type")) {
httpRequestBuilder.putHeaders("Content-Type", "application/json");
}
taskBuilder.setHttpRequest(httpRequestBuilder.build());
// Create the task
return client.createTask(queuePath, taskBuilder.build());
}
// Create task with OIDC token authentication
public Task createHttpTaskWithOidcToken(String queueName, String url, String payload,
String serviceAccountEmail) {
String queuePath = QueueName.of(projectId, location, queueName).toString();
HttpRequest httpRequest = HttpRequest.newBuilder()
.setUrl(url)
.setHttpMethod(HttpMethod.POST)
.setBody(ByteString.copyFromUtf8(payload))
.putHeaders("Content-Type", "application/json")
.setOidcToken(OidcToken.newBuilder()
.setServiceAccountEmail(serviceAccountEmail)
.build())
.build();
Task task = Task.newBuilder()
.setHttpRequest(httpRequest)
.build();
return client.createTask(queuePath, task);
}
// Create delayed task
public Task createDelayedTask(String queueName, String url, String payload,
long delaySeconds) {
Date scheduleTime = new Date(System.currentTimeMillis() + (delaySeconds * 1000));
return createHttpTask(queueName, url, payload, scheduleTime, null);
}
// Create task with deduplication ID
public Task createTaskWithDeduplication(String queueName, String url, String payload,
String taskName) {
String queuePath = QueueName.of(projectId, location, queueName).toString();
String fullTaskName = queuePath + "/tasks/" + taskName;
HttpRequest httpRequest = HttpRequest.newBuilder()
.setUrl(url)
.setHttpMethod(HttpMethod.POST)
.setBody(ByteString.copyFromUtf8(payload))
.putHeaders("Content-Type", "application/json")
.build();
Task task = Task.newBuilder()
.setName(fullTaskName)
.setHttpRequest(httpRequest)
.build();
return client.createTask(queuePath, task);
}
// Batch create tasks
public List<Task> createTasksInBatch(String queueName, List<TaskRequest> taskRequests) {
String queuePath = QueueName.of(projectId, location, queueName).toString();
List<Task> createdTasks = new ArrayList<>();
for (TaskRequest taskRequest : taskRequests) {
Task task = createHttpTask(
queueName,
taskRequest.getUrl(),
taskRequest.getPayload(),
taskRequest.getScheduleTime(),
taskRequest.getHeaders()
);
createdTasks.add(task);
}
return createdTasks;
}
// Task request DTO
public static class TaskRequest {
private String url;
private String payload;
private Date scheduleTime;
private Map<String, String> headers;
// Constructors, getters, and setters
public TaskRequest(String url, String payload) {
this.url = url;
this.payload = payload;
}
public String getUrl() { return url; }
public void setUrl(String url) { this.url = url; }
public String getPayload() { return payload; }
public void setPayload(String payload) { this.payload = payload; }
public Date getScheduleTime() { return scheduleTime; }
public void setScheduleTime(Date scheduleTime) { this.scheduleTime = scheduleTime; }
public Map<String, String> getHeaders() { return headers; }
public void setHeaders(Map<String, String> headers) { this.headers = headers; }
}
}
2. Task Management Operations
public class TaskManager {
private final CloudTasksClient client;
private final String projectId;
private final String location;
public TaskManager(CloudTasksClient client, String projectId, String location) {
this.client = client;
this.projectId = projectId;
this.location = location;
}
// Get a task
public Task getTask(String queueName, String taskName) {
String taskPath = TaskName.of(projectId, location, queueName, taskName).toString();
return client.getTask(taskPath);
}
// List tasks in a queue
public List<Task> listTasks(String queueName) {
List<Task> tasks = new ArrayList<>();
String queuePath = QueueName.of(projectId, location, queueName).toString();
for (Task task : client.listTasks(queuePath).iterateAll()) {
tasks.add(task);
}
return tasks;
}
// Delete a task
public void deleteTask(String queueName, String taskName) {
String taskPath = TaskName.of(projectId, location, queueName, taskName).toString();
client.deleteTask(taskPath);
}
// Purge all tasks in a queue
public void purgeQueue(String queueName) {
String queuePath = QueueName.of(projectId, location, queueName).toString();
client.purgeQueue(queuePath);
}
// Pause a task (by updating its schedule time)
public Task pauseTask(String queueName, String taskName, long pauseSeconds) {
Task task = getTask(queueName, taskName);
// Calculate new schedule time
long newScheduleTime = System.currentTimeMillis() + (pauseSeconds * 1000);
Timestamp newTimestamp = Timestamp.newBuilder()
.setSeconds(newScheduleTime / 1000)
.build();
Task updatedTask = task.toBuilder()
.setScheduleTime(newTimestamp)
.build();
return client.updateTask(updatedTask);
}
// Run a task immediately
public void runTaskNow(String queueName, String taskName) {
String taskPath = TaskName.of(projectId, location, queueName, taskName).toString();
client.runTask(taskPath);
}
// Get task statistics
public QueueStats getQueueStats(String queueName) {
Queue queue = client.getQueue(QueueName.of(projectId, location, queueName).toString());
return queue.getStats();
}
}
Task Handler Implementation
1. Spring Boot Task Handler
import org.springframework.web.bind.annotation.*;
import org.springframework.http.ResponseEntity;
import com.fasterxml.jackson.databind.ObjectMapper;
import javax.servlet.http.HttpServletRequest;
import java.util.Map;
import java.util.logging.Logger;
@RestController
@RequestMapping("/tasks")
public class TaskHandlerController {
private static final Logger logger = Logger.getLogger(TaskHandlerController.class.getName());
private final ObjectMapper objectMapper = new ObjectMapper();
private final EmailService emailService;
private final DataProcessingService dataProcessingService;
private final NotificationService notificationService;
public TaskHandlerController(EmailService emailService,
DataProcessingService dataProcessingService,
NotificationService notificationService) {
this.emailService = emailService;
this.dataProcessingService = dataProcessingService;
this.notificationService = notificationService;
}
// Generic task handler
@PostMapping("/process")
public ResponseEntity<String> handleTask(@RequestBody String payload,
HttpServletRequest request) {
try {
// Log task execution
logger.info("Received task with payload: " + payload);
// Parse the payload
TaskRequest taskRequest = objectMapper.readValue(payload, TaskRequest.class);
// Route to appropriate handler based on task type
switch (taskRequest.getType()) {
case "SEND_EMAIL":
handleSendEmail(taskRequest);
break;
case "PROCESS_DATA":
handleProcessData(taskRequest);
break;
case "SEND_NOTIFICATION":
handleSendNotification(taskRequest);
break;
case "CLEANUP":
handleCleanup(taskRequest);
break;
default:
logger.warning("Unknown task type: " + taskRequest.getType());
return ResponseEntity.badRequest().body("Unknown task type");
}
return ResponseEntity.ok("Task processed successfully");
} catch (Exception e) {
logger.severe("Failed to process task: " + e.getMessage());
// Return 5xx to trigger retry, 4xx to fail permanently
if (shouldRetry(e)) {
return ResponseEntity.status(503).body("Temporary failure");
} else {
return ResponseEntity.badRequest().body("Permanent failure");
}
}
}
// Email task handler
@PostMapping("/send-email")
public ResponseEntity<String> handleEmailTask(@RequestBody EmailTaskRequest request) {
try {
logger.info("Processing email task for: " + request.getToEmail());
emailService.sendEmail(
request.getToEmail(),
request.getSubject(),
request.getBody(),
request.getTemplateId()
);
logger.info("Email sent successfully to: " + request.getToEmail());
return ResponseEntity.ok("Email sent");
} catch (Exception e) {
logger.severe("Failed to send email: " + e.getMessage());
return ResponseEntity.status(503).body("Failed to send email");
}
}
// Data processing task handler
@PostMapping("/process-data")
public ResponseEntity<String> handleDataProcessing(@RequestBody DataProcessingRequest request) {
try {
logger.info("Processing data for batch: " + request.getBatchId());
dataProcessingService.processBatch(
request.getBatchId(),
request.getData(),
request.getOptions()
);
logger.info("Data processing completed for batch: " + request.getBatchId());
return ResponseEntity.ok("Data processed");
} catch (Exception e) {
logger.severe("Data processing failed: " + e.getMessage());
return ResponseEntity.status(503).body("Data processing failed");
}
}
// Notification task handler
@PostMapping("/send-notification")
public ResponseEntity<String> handleNotification(@RequestBody NotificationRequest request) {
try {
logger.info("Sending notification to user: " + request.getUserId());
notificationService.sendPushNotification(
request.getUserId(),
request.getTitle(),
request.getMessage(),
request.getData()
);
return ResponseEntity.ok("Notification sent");
} catch (Exception e) {
logger.severe("Notification failed: " + e.getMessage());
return ResponseEntity.status(503).body("Notification failed");
}
}
// Health check for task handler
@GetMapping("/health")
public ResponseEntity<String> healthCheck() {
return ResponseEntity.ok("Task handler is healthy");
}
private void handleSendEmail(TaskRequest taskRequest) throws Exception {
EmailTaskRequest emailRequest = objectMapper.convertValue(
taskRequest.getData(), EmailTaskRequest.class);
handleEmailTask(emailRequest);
}
private void handleProcessData(TaskRequest taskRequest) throws Exception {
DataProcessingRequest dataRequest = objectMapper.convertValue(
taskRequest.getData(), DataProcessingRequest.class);
handleDataProcessing(dataRequest);
}
private void handleSendNotification(TaskRequest taskRequest) throws Exception {
NotificationRequest notificationRequest = objectMapper.convertValue(
taskRequest.getData(), NotificationRequest.class);
handleNotification(notificationRequest);
}
private void handleCleanup(TaskRequest taskRequest) {
// Implementation for cleanup tasks
logger.info("Executing cleanup task");
}
private boolean shouldRetry(Exception e) {
// Determine if task should be retried based on exception type
return !(e instanceof IllegalArgumentException ||
e instanceof UnsupportedOperationException);
}
// DTO classes
public static class TaskRequest {
private String type;
private String id;
private Map<String, Object> data;
private int attempt;
// Getters and setters
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public Map<String, Object> getData() { return data; }
public void setData(Map<String, Object> data) { this.data = data; }
public int getAttempt() { return attempt; }
public void setAttempt(int attempt) { this.attempt = attempt; }
}
public static class EmailTaskRequest {
private String toEmail;
private String subject;
private String body;
private String templateId;
// Getters and setters
public String getToEmail() { return toEmail; }
public void setToEmail(String toEmail) { this.toEmail = toEmail; }
public String getSubject() { return subject; }
public void setSubject(String subject) { this.subject = subject; }
public String getBody() { return body; }
public void setBody(String body) { this.body = body; }
public String getTemplateId() { return templateId; }
public void setTemplateId(String templateId) { this.templateId = templateId; }
}
public static class DataProcessingRequest {
private String batchId;
private List<Map<String, Object>> data;
private Map<String, Object> options;
// Getters and setters
public String getBatchId() { return batchId; }
public void setBatchId(String batchId) { this.batchId = batchId; }
public List<Map<String, Object>> getData() { return data; }
public void setData(List<Map<String, Object>> data) { this.data = data; }
public Map<String, Object> getOptions() { return options; }
public void setOptions(Map<String, Object> options) { this.options = options; }
}
public static class NotificationRequest {
private String userId;
private String title;
private String message;
private Map<String, String> data;
// Getters and setters
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public String getTitle() { return title; }
public void setTitle(String title) { this.title = title; }
public String getMessage() { return message; }
public void setMessage(String message) { this.message = message; }
public Map<String, String> getData() { return data; }
public void setData(Map<String, String> data) { this.data = data; }
}
}
2. Service Implementations
import org.springframework.stereotype.Service;
import java.util.logging.Logger;
@Service
public class EmailService {
private static final Logger logger = Logger.getLogger(EmailService.class.getName());
public void sendEmail(String to, String subject, String body, String templateId) {
// Implementation for sending email
logger.info(String.format("Sending email to: %s, Subject: %s", to, subject));
// Simulate email sending
try {
Thread.sleep(1000); // Simulate processing time
logger.info("Email sent successfully to: " + to);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Email sending interrupted", e);
}
}
}
@Service
public class DataProcessingService {
private static final Logger logger = Logger.getLogger(DataProcessingService.class.getName());
public void processBatch(String batchId, List<Map<String, Object>> data,
Map<String, Object> options) {
logger.info("Processing batch: " + batchId + " with " + data.size() + " records");
// Simulate data processing
for (Map<String, Object> record : data) {
processRecord(record, options);
}
logger.info("Batch processing completed: " + batchId);
}
private void processRecord(Map<String, Object> record, Map<String, Object> options) {
// Process individual record
try {
Thread.sleep(10); // Simulate processing time per record
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Data processing interrupted", e);
}
}
}
@Service
public class NotificationService {
private static final Logger logger = Logger.getLogger(NotificationService.class.getName());
public void sendPushNotification(String userId, String title, String message,
Map<String, String> data) {
logger.info(String.format("Sending notification to user: %s, Title: %s", userId, title));
// Simulate notification sending
try {
Thread.sleep(500); // Simulate processing time
logger.info("Notification sent successfully to user: " + userId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Notification sending interrupted", e);
}
}
}
Advanced Patterns
1. Task Chaining
public class TaskChainingService {
private final TaskCreator taskCreator;
private final ObjectMapper objectMapper;
public TaskChainingService(TaskCreator taskCreator, ObjectMapper objectMapper) {
this.taskCreator = taskCreator;
this.objectMapper = objectMapper;
}
// Chain multiple tasks together
public void createProcessingPipeline(String initialQueue, String initialPayload) {
try {
// Step 1: Data validation
String validationTaskPayload = objectMapper.writeValueAsString(
new PipelineTask("VALIDATE_DATA", initialPayload, 1));
taskCreator.createHttpTask(initialQueue,
"https://yourapp.com/tasks/validate", validationTaskPayload);
} catch (Exception e) {
throw new RuntimeException("Failed to create processing pipeline", e);
}
}
// Handler for chained tasks
public ResponseEntity<String> handleChainedTask(@RequestBody PipelineTask task) {
try {
switch (task.getStage()) {
case "VALIDATE_DATA":
return handleValidationStage(task);
case "PROCESS_DATA":
return handleProcessingStage(task);
case "NOTIFY_COMPLETION":
return handleNotificationStage(task);
default:
return ResponseEntity.badRequest().body("Unknown pipeline stage");
}
} catch (Exception e) {
// Log and potentially trigger compensation tasks
logger.severe("Pipeline stage failed: " + task.getStage());
return ResponseEntity.status(503).body("Stage failed");
}
}
private ResponseEntity<String> handleValidationStage(PipelineTask task) {
// Validate data
boolean isValid = validateData(task.getPayload());
if (isValid) {
// Proceed to next stage
task.setStage("PROCESS_DATA");
task.setSequence(task.getSequence() + 1);
String nextPayload = objectMapper.writeValueAsString(task);
taskCreator.createHttpTask("processing-queue",
"https://yourapp.com/tasks/process", nextPayload);
return ResponseEntity.ok("Validation passed");
} else {
// Trigger compensation
triggerCompensationTask(task);
return ResponseEntity.badRequest().body("Validation failed");
}
}
private ResponseEntity<String> handleProcessingStage(PipelineTask task) {
// Process data
String processedData = processData(task.getPayload());
task.setPayload(processedData);
task.setStage("NOTIFY_COMPLETION");
task.setSequence(task.getSequence() + 1);
String nextPayload = objectMapper.writeValueAsString(task);
taskCreator.createHttpTask("notification-queue",
"https://yourapp.com/tasks/notify", nextPayload);
return ResponseEntity.ok("Processing completed");
}
private ResponseEntity<String> handleNotificationStage(PipelineTask task) {
// Send completion notification
sendCompletionNotification(task.getPayload());
return ResponseEntity.ok("Pipeline completed");
}
private boolean validateData(String payload) {
// Implementation
return true;
}
private String processData(String payload) {
// Implementation
return payload;
}
private void sendCompletionNotification(String payload) {
// Implementation
}
private void triggerCompensationTask(PipelineTask task) {
// Create compensation task
try {
String compensationPayload = objectMapper.writeValueAsString(task);
taskCreator.createHttpTask("compensation-queue",
"https://yourapp.com/tasks/compensate", compensationPayload);
} catch (Exception e) {
logger.severe("Failed to create compensation task: " + e.getMessage());
}
}
public static class PipelineTask {
private String stage;
private String payload;
private int sequence;
private String pipelineId;
public PipelineTask(String stage, String payload, int sequence) {
this.stage = stage;
this.payload = payload;
this.sequence = sequence;
this.pipelineId = UUID.randomUUID().toString();
}
// Getters and setters
public String getStage() { return stage; }
public void setStage(String stage) { this.stage = stage; }
public String getPayload() { return payload; }
public void setPayload(String payload) { this.payload = payload; }
public int getSequence() { return sequence; }
public void setSequence(int sequence) { this.sequence = sequence; }
public String getPipelineId() { return pipelineId; }
public void setPipelineId(String pipelineId) { this.pipelineId = pipelineId; }
}
}
2. Rate Limiting and Throttling
public class RateLimitedTaskService {
private final TaskCreator taskCreator;
private final Map<String, RateLimiter> rateLimiters;
public RateLimitedTaskService(TaskCreator taskCreator) {
this.taskCreator = taskCreator;
this.rateLimiters = new ConcurrentHashMap<>();
}
public Task createRateLimitedTask(String queueName, String url, String payload,
String rateLimitKey, int maxPerMinute) {
RateLimiter limiter = rateLimiters.computeIfAbsent(rateLimitKey,
k -> new RateLimiter(maxPerMinute, 60));
if (limiter.tryAcquire()) {
return taskCreator.createHttpTask(queueName, url, payload);
} else {
// Schedule for later execution
long delaySeconds = calculateRetryDelay(limiter);
return taskCreator.createDelayedTask(queueName, url, payload, delaySeconds);
}
}
private long calculateRetryDelay(RateLimiter limiter) {
// Calculate when the next token will be available
return Math.max(1, limiter.getNextAvailableTime() - System.currentTimeMillis()) / 1000;
}
// Simple rate limiter implementation
private static class RateLimiter {
private final int maxRequests;
private final long timeWindowMillis;
private final Queue<Long> requestTimes;
public RateLimiter(int maxRequests, long timeWindowSeconds) {
this.maxRequests = maxRequests;
this.timeWindowMillis = timeWindowSeconds * 1000;
this.requestTimes = new LinkedList<>();
}
public synchronized boolean tryAcquire() {
long now = System.currentTimeMillis();
// Remove expired requests
while (!requestTimes.isEmpty() &&
requestTimes.peek() < now - timeWindowMillis) {
requestTimes.poll();
}
if (requestTimes.size() < maxRequests) {
requestTimes.offer(now);
return true;
}
return false;
}
public synchronized long getNextAvailableTime() {
if (requestTimes.size() < maxRequests) {
return System.currentTimeMillis();
}
return requestTimes.peek() + timeWindowMillis;
}
}
}
Monitoring and Error Handling
1. Task Monitoring
public class TaskMonitor {
private final CloudTasksClient client;
private final String projectId;
private final String location;
private final MetricsService metricsService;
public TaskMonitor(CloudTasksClient client, String projectId, String location,
MetricsService metricsService) {
this.client = client;
this.projectId = projectId;
this.location = location;
this.metricsService = metricsService;
}
public void monitorQueues() {
List<Queue> queues = listAllQueues();
for (Queue queue : queues) {
monitorQueue(queue);
}
}
private void monitorQueue(Queue queue) {
String queueName = QueueName.parse(queue.getName()).getQueue();
QueueStats stats = queue.getStats();
if (stats != null) {
// Report metrics
metricsService.recordGauge("cloud_tasks.tasks_dispatched",
stats.getDispatchedCount(), queueName);
metricsService.recordGauge("cloud_tasks.concurrent_dispatches",
stats.getConcurrentDispatchesCount(), queueName);
metricsService.recordGauge("cloud_tasks.oldest_estimated_arrival_time",
stats.getOldestEstimatedArrivalTime().getSeconds(), queueName);
}
// Check for stalled tasks
checkForStalledTasks(queueName);
}
private void checkForStalledTasks(String queueName) {
List<Task> tasks = listTasks(queueName);
long now = System.currentTimeMillis();
for (Task task : tasks) {
long taskAge = now - (task.getScheduleTime().getSeconds() * 1000);
if (taskAge > 3600000) { // 1 hour
logger.warning("Stalled task detected: " + task.getName());
metricsService.incrementCounter("cloud_tasks.stalled_tasks", queueName);
}
}
}
private List<Queue> listAllQueues() {
List<Queue> queues = new ArrayList<>();
LocationName parent = LocationName.of(projectId, location);
for (Queue queue : client.listQueues(parent).iterateAll()) {
queues.add(queue);
}
return queues;
}
private List<Task> listTasks(String queueName) {
List<Task> tasks = new ArrayList<>();
String queuePath = QueueName.of(projectId, location, queueName).toString();
for (Task task : client.listTasks(queuePath).iterateAll()) {
tasks.add(task);
}
return tasks;
}
}
Best Practices
- Use appropriate queue configurations based on workload
- Implement proper error handling in task handlers
- Use OIDC tokens for secure task execution
- Monitor queue metrics and set up alerts
- Implement retry logic with exponential backoff
- Use task names for deduplication when needed
- Test task handlers thoroughly
- Implement proper logging and tracing
Google Cloud Tasks provides a robust, scalable solution for managing asynchronous work in your applications, with built-in retry mechanisms, rate limiting, and security features.