Google Cloud Tasks in Java

Overview

Google Cloud Tasks is a fully managed service that allows you to manage the execution, dispatch, and delivery of a large number of distributed tasks. You can use it to perform work asynchronously outside of a user request.

Key Concepts

  • Queue: Container for tasks
  • Task: Unit of work to be executed
  • Handler: Service that processes tasks
  • HTTP Target: HTTP endpoint that receives tasks
  • App Engine Target: App Engine service that receives tasks

Setup and Dependencies

1. Maven Dependencies

<properties>
<google.cloud.tasks.version>2.36.0</google.cloud.tasks.version>
</properties>
<dependencies>
<!-- Cloud Tasks Client -->
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-tasks</artifactId>
<version>${google.cloud.tasks.version}</version>
</dependency>
<!-- For HTTP client in task handlers -->
<dependency>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
<version>1.43.3</version>
</dependency>
<!-- For JSON processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<!-- For authentication -->
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
<version>1.19.0</version>
</dependency>
</dependencies>

2. Authentication Setup

import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.CloudTasksSettings;
import com.google.auth.oauth2.GoogleCredentials;
import java.io.FileInputStream;
import java.io.IOException;
public class CloudTasksConfig {
public static CloudTasksClient createClient() throws IOException {
// Method 1: Use default credentials (for GCP environments)
CloudTasksSettings settings = CloudTasksSettings.newBuilder().build();
return CloudTasksClient.create(settings);
}
public static CloudTasksClient createClientWithServiceAccount(String credentialsPath) 
throws IOException {
// Method 2: Use service account credentials
GoogleCredentials credentials = GoogleCredentials
.fromStream(new FileInputStream(credentialsPath));
CloudTasksSettings settings = CloudTasksSettings.newBuilder()
.setCredentialsProvider(() -> credentials)
.build();
return CloudTasksClient.create(settings);
}
public static CloudTasksClient createClientWithImpersonation(
String targetServiceAccount) throws IOException {
// Method 3: Use credential impersonation
GoogleCredentials sourceCredentials = GoogleCredentials.getApplicationDefault();
GoogleCredentials impersonatedCredentials = sourceCredentials
.createDelegated(targetServiceAccount);
CloudTasksSettings settings = CloudTasksSettings.newBuilder()
.setCredentialsProvider(() -> impersonatedCredentials)
.build();
return CloudTasksClient.create(settings);
}
}

Basic Queue Management

1. Queue Operations

import com.google.cloud.tasks.v2.*;
import com.google.protobuf.Duration;
import com.google.protobuf.util.Durations;
public class QueueManager {
private final CloudTasksClient client;
private final String projectId;
private final String location;
public QueueManager(CloudTasksClient client, String projectId, String location) {
this.client = client;
this.projectId = projectId;
this.location = location;
}
// Create a queue
public Queue createQueue(String queueName, QueueConfig config) {
String queuePath = QueueName.of(projectId, location, queueName).toString();
Queue.Builder queueBuilder = Queue.newBuilder()
.setName(queuePath);
// Set retry config
if (config.getMaxAttempts() > 0) {
RetryConfig retryConfig = RetryConfig.newBuilder()
.setMaxAttempts(config.getMaxAttempts())
.setMaxRetryDuration(Durations.fromSeconds(config.getMaxRetryDurationSeconds()))
.setMinBackoff(Durations.fromSeconds(config.getMinBackoffSeconds()))
.setMaxBackoff(Durations.fromSeconds(config.getMaxBackoffSeconds()))
.build();
queueBuilder.setRetryConfig(retryConfig);
}
// Set rate limits
if (config.getMaxDispatchesPerSecond() > 0) {
RateLimits rateLimits = RateLimits.newBuilder()
.setMaxDispatchesPerSecond(config.getMaxDispatchesPerSecond())
.setMaxConcurrentDispatches(config.getMaxConcurrentDispatches())
.build();
queueBuilder.setRateLimits(rateLimits);
}
Queue queue = queueBuilder.build();
return client.createQueue(LocationName.of(projectId, location), queue);
}
// Get a queue
public Queue getQueue(String queueName) {
String queuePath = QueueName.of(projectId, location, queueName).toString();
return client.getQueue(queuePath);
}
// List all queues
public List<Queue> listQueues() {
List<Queue> queues = new ArrayList<>();
LocationName parent = LocationName.of(projectId, location);
for (Queue queue : client.listQueues(parent).iterateAll()) {
queues.add(queue);
}
return queues;
}
// Update a queue
public Queue updateQueue(String queueName, QueueConfig newConfig) {
Queue existingQueue = getQueue(queueName);
Queue.Builder updatedBuilder = existingQueue.toBuilder();
// Update retry config
RetryConfig retryConfig = RetryConfig.newBuilder()
.setMaxAttempts(newConfig.getMaxAttempts())
.setMaxRetryDuration(Durations.fromSeconds(newConfig.getMaxRetryDurationSeconds()))
.build();
updatedBuilder.setRetryConfig(retryConfig);
// Update rate limits
RateLimits rateLimits = RateLimits.newBuilder()
.setMaxDispatchesPerSecond(newConfig.getMaxDispatchesPerSecond())
.setMaxConcurrentDispatches(newConfig.getMaxConcurrentDispatches())
.build();
updatedBuilder.setRateLimits(rateLimits);
return client.updateQueue(updatedBuilder.build());
}
// Delete a queue
public void deleteQueue(String queueName) {
String queuePath = QueueName.of(projectId, location, queueName).toString();
client.deleteQueue(queuePath);
}
// Pause a queue
public Queue pauseQueue(String queueName) {
Queue queue = getQueue(queueName);
Queue pausedQueue = queue.toBuilder().setState(Queue.State.PAUSED).build();
return client.updateQueue(pausedQueue);
}
// Resume a queue
public Queue resumeQueue(String queueName) {
Queue queue = getQueue(queueName);
Queue resumedQueue = queue.toBuilder().setState(Queue.State.RUNNING).build();
return client.updateQueue(resumedQueue);
}
// Configuration class for queue settings
public static class QueueConfig {
private int maxAttempts = 10;
private long maxRetryDurationSeconds = 3600; // 1 hour
private double maxDispatchesPerSecond = 500.0;
private int maxConcurrentDispatches = 1000;
private long minBackoffSeconds = 0; // Immediate retry
private long maxBackoffSeconds = 3600; // 1 hour
// Getters and setters
public int getMaxAttempts() { return maxAttempts; }
public void setMaxAttempts(int maxAttempts) { this.maxAttempts = maxAttempts; }
public long getMaxRetryDurationSeconds() { return maxRetryDurationSeconds; }
public void setMaxRetryDurationSeconds(long maxRetryDurationSeconds) { 
this.maxRetryDurationSeconds = maxRetryDurationSeconds; 
}
public double getMaxDispatchesPerSecond() { return maxDispatchesPerSecond; }
public void setMaxDispatchesPerSecond(double maxDispatchesPerSecond) { 
this.maxDispatchesPerSecond = maxDispatchesPerSecond; 
}
public int getMaxConcurrentDispatches() { return maxConcurrentDispatches; }
public void setMaxConcurrentDispatches(int maxConcurrentDispatches) { 
this.maxConcurrentDispatches = maxConcurrentDispatches; 
}
public long getMinBackoffSeconds() { return minBackoffSeconds; }
public void setMinBackoffSeconds(long minBackoffSeconds) { 
this.minBackoffSeconds = minBackoffSeconds; 
}
public long getMaxBackoffSeconds() { return maxBackoffSeconds; }
public void setMaxBackoffSeconds(long maxBackoffSeconds) { 
this.maxBackoffSeconds = maxBackoffSeconds; 
}
}
}

Task Creation and Management

1. HTTP Task Creation

import com.google.cloud.tasks.v2.*;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Durations;
import com.google.protobuf.util.Timestamps;
public class TaskCreator {
private final CloudTasksClient client;
private final String projectId;
private final String location;
public TaskCreator(CloudTasksClient client, String projectId, String location) {
this.client = client;
this.projectId = projectId;
this.location = location;
}
// Create a simple HTTP task
public Task createHttpTask(String queueName, String url, String payload) {
return createHttpTask(queueName, url, payload, null, null);
}
// Create HTTP task with scheduling and headers
public Task createHttpTask(String queueName, String url, String payload, 
Date scheduleTime, Map<String, String> headers) {
String queuePath = QueueName.of(projectId, location, queueName).toString();
// Build the task
Task.Builder taskBuilder = Task.newBuilder();
// Set schedule time if provided
if (scheduleTime != null) {
Timestamp timestamp = Timestamp.newBuilder()
.setSeconds(scheduleTime.getTime() / 1000)
.build();
taskBuilder.setScheduleTime(timestamp);
}
// Build HTTP request
HttpRequest.Builder httpRequestBuilder = HttpRequest.newBuilder()
.setUrl(url)
.setHttpMethod(HttpMethod.POST);
// Add payload
if (payload != null && !payload.isEmpty()) {
httpRequestBuilder.setBody(ByteString.copyFromUtf8(payload));
}
// Add headers
if (headers != null) {
for (Map.Entry<String, String> header : headers.entrySet()) {
httpRequestBuilder.putHeaders(header.getKey(), header.getValue());
}
}
// Add content-type header if not present
if (!httpRequestBuilder.getHeadersMap().containsKey("Content-Type")) {
httpRequestBuilder.putHeaders("Content-Type", "application/json");
}
taskBuilder.setHttpRequest(httpRequestBuilder.build());
// Create the task
return client.createTask(queuePath, taskBuilder.build());
}
// Create task with OIDC token authentication
public Task createHttpTaskWithOidcToken(String queueName, String url, String payload,
String serviceAccountEmail) {
String queuePath = QueueName.of(projectId, location, queueName).toString();
HttpRequest httpRequest = HttpRequest.newBuilder()
.setUrl(url)
.setHttpMethod(HttpMethod.POST)
.setBody(ByteString.copyFromUtf8(payload))
.putHeaders("Content-Type", "application/json")
.setOidcToken(OidcToken.newBuilder()
.setServiceAccountEmail(serviceAccountEmail)
.build())
.build();
Task task = Task.newBuilder()
.setHttpRequest(httpRequest)
.build();
return client.createTask(queuePath, task);
}
// Create delayed task
public Task createDelayedTask(String queueName, String url, String payload, 
long delaySeconds) {
Date scheduleTime = new Date(System.currentTimeMillis() + (delaySeconds * 1000));
return createHttpTask(queueName, url, payload, scheduleTime, null);
}
// Create task with deduplication ID
public Task createTaskWithDeduplication(String queueName, String url, String payload,
String taskName) {
String queuePath = QueueName.of(projectId, location, queueName).toString();
String fullTaskName = queuePath + "/tasks/" + taskName;
HttpRequest httpRequest = HttpRequest.newBuilder()
.setUrl(url)
.setHttpMethod(HttpMethod.POST)
.setBody(ByteString.copyFromUtf8(payload))
.putHeaders("Content-Type", "application/json")
.build();
Task task = Task.newBuilder()
.setName(fullTaskName)
.setHttpRequest(httpRequest)
.build();
return client.createTask(queuePath, task);
}
// Batch create tasks
public List<Task> createTasksInBatch(String queueName, List<TaskRequest> taskRequests) {
String queuePath = QueueName.of(projectId, location, queueName).toString();
List<Task> createdTasks = new ArrayList<>();
for (TaskRequest taskRequest : taskRequests) {
Task task = createHttpTask(
queueName, 
taskRequest.getUrl(), 
taskRequest.getPayload(),
taskRequest.getScheduleTime(),
taskRequest.getHeaders()
);
createdTasks.add(task);
}
return createdTasks;
}
// Task request DTO
public static class TaskRequest {
private String url;
private String payload;
private Date scheduleTime;
private Map<String, String> headers;
// Constructors, getters, and setters
public TaskRequest(String url, String payload) {
this.url = url;
this.payload = payload;
}
public String getUrl() { return url; }
public void setUrl(String url) { this.url = url; }
public String getPayload() { return payload; }
public void setPayload(String payload) { this.payload = payload; }
public Date getScheduleTime() { return scheduleTime; }
public void setScheduleTime(Date scheduleTime) { this.scheduleTime = scheduleTime; }
public Map<String, String> getHeaders() { return headers; }
public void setHeaders(Map<String, String> headers) { this.headers = headers; }
}
}

2. Task Management Operations

public class TaskManager {
private final CloudTasksClient client;
private final String projectId;
private final String location;
public TaskManager(CloudTasksClient client, String projectId, String location) {
this.client = client;
this.projectId = projectId;
this.location = location;
}
// Get a task
public Task getTask(String queueName, String taskName) {
String taskPath = TaskName.of(projectId, location, queueName, taskName).toString();
return client.getTask(taskPath);
}
// List tasks in a queue
public List<Task> listTasks(String queueName) {
List<Task> tasks = new ArrayList<>();
String queuePath = QueueName.of(projectId, location, queueName).toString();
for (Task task : client.listTasks(queuePath).iterateAll()) {
tasks.add(task);
}
return tasks;
}
// Delete a task
public void deleteTask(String queueName, String taskName) {
String taskPath = TaskName.of(projectId, location, queueName, taskName).toString();
client.deleteTask(taskPath);
}
// Purge all tasks in a queue
public void purgeQueue(String queueName) {
String queuePath = QueueName.of(projectId, location, queueName).toString();
client.purgeQueue(queuePath);
}
// Pause a task (by updating its schedule time)
public Task pauseTask(String queueName, String taskName, long pauseSeconds) {
Task task = getTask(queueName, taskName);
// Calculate new schedule time
long newScheduleTime = System.currentTimeMillis() + (pauseSeconds * 1000);
Timestamp newTimestamp = Timestamp.newBuilder()
.setSeconds(newScheduleTime / 1000)
.build();
Task updatedTask = task.toBuilder()
.setScheduleTime(newTimestamp)
.build();
return client.updateTask(updatedTask);
}
// Run a task immediately
public void runTaskNow(String queueName, String taskName) {
String taskPath = TaskName.of(projectId, location, queueName, taskName).toString();
client.runTask(taskPath);
}
// Get task statistics
public QueueStats getQueueStats(String queueName) {
Queue queue = client.getQueue(QueueName.of(projectId, location, queueName).toString());
return queue.getStats();
}
}

Task Handler Implementation

1. Spring Boot Task Handler

import org.springframework.web.bind.annotation.*;
import org.springframework.http.ResponseEntity;
import com.fasterxml.jackson.databind.ObjectMapper;
import javax.servlet.http.HttpServletRequest;
import java.util.Map;
import java.util.logging.Logger;
@RestController
@RequestMapping("/tasks")
public class TaskHandlerController {
private static final Logger logger = Logger.getLogger(TaskHandlerController.class.getName());
private final ObjectMapper objectMapper = new ObjectMapper();
private final EmailService emailService;
private final DataProcessingService dataProcessingService;
private final NotificationService notificationService;
public TaskHandlerController(EmailService emailService, 
DataProcessingService dataProcessingService,
NotificationService notificationService) {
this.emailService = emailService;
this.dataProcessingService = dataProcessingService;
this.notificationService = notificationService;
}
// Generic task handler
@PostMapping("/process")
public ResponseEntity<String> handleTask(@RequestBody String payload,
HttpServletRequest request) {
try {
// Log task execution
logger.info("Received task with payload: " + payload);
// Parse the payload
TaskRequest taskRequest = objectMapper.readValue(payload, TaskRequest.class);
// Route to appropriate handler based on task type
switch (taskRequest.getType()) {
case "SEND_EMAIL":
handleSendEmail(taskRequest);
break;
case "PROCESS_DATA":
handleProcessData(taskRequest);
break;
case "SEND_NOTIFICATION":
handleSendNotification(taskRequest);
break;
case "CLEANUP":
handleCleanup(taskRequest);
break;
default:
logger.warning("Unknown task type: " + taskRequest.getType());
return ResponseEntity.badRequest().body("Unknown task type");
}
return ResponseEntity.ok("Task processed successfully");
} catch (Exception e) {
logger.severe("Failed to process task: " + e.getMessage());
// Return 5xx to trigger retry, 4xx to fail permanently
if (shouldRetry(e)) {
return ResponseEntity.status(503).body("Temporary failure");
} else {
return ResponseEntity.badRequest().body("Permanent failure");
}
}
}
// Email task handler
@PostMapping("/send-email")
public ResponseEntity<String> handleEmailTask(@RequestBody EmailTaskRequest request) {
try {
logger.info("Processing email task for: " + request.getToEmail());
emailService.sendEmail(
request.getToEmail(),
request.getSubject(),
request.getBody(),
request.getTemplateId()
);
logger.info("Email sent successfully to: " + request.getToEmail());
return ResponseEntity.ok("Email sent");
} catch (Exception e) {
logger.severe("Failed to send email: " + e.getMessage());
return ResponseEntity.status(503).body("Failed to send email");
}
}
// Data processing task handler
@PostMapping("/process-data")
public ResponseEntity<String> handleDataProcessing(@RequestBody DataProcessingRequest request) {
try {
logger.info("Processing data for batch: " + request.getBatchId());
dataProcessingService.processBatch(
request.getBatchId(),
request.getData(),
request.getOptions()
);
logger.info("Data processing completed for batch: " + request.getBatchId());
return ResponseEntity.ok("Data processed");
} catch (Exception e) {
logger.severe("Data processing failed: " + e.getMessage());
return ResponseEntity.status(503).body("Data processing failed");
}
}
// Notification task handler
@PostMapping("/send-notification")
public ResponseEntity<String> handleNotification(@RequestBody NotificationRequest request) {
try {
logger.info("Sending notification to user: " + request.getUserId());
notificationService.sendPushNotification(
request.getUserId(),
request.getTitle(),
request.getMessage(),
request.getData()
);
return ResponseEntity.ok("Notification sent");
} catch (Exception e) {
logger.severe("Notification failed: " + e.getMessage());
return ResponseEntity.status(503).body("Notification failed");
}
}
// Health check for task handler
@GetMapping("/health")
public ResponseEntity<String> healthCheck() {
return ResponseEntity.ok("Task handler is healthy");
}
private void handleSendEmail(TaskRequest taskRequest) throws Exception {
EmailTaskRequest emailRequest = objectMapper.convertValue(
taskRequest.getData(), EmailTaskRequest.class);
handleEmailTask(emailRequest);
}
private void handleProcessData(TaskRequest taskRequest) throws Exception {
DataProcessingRequest dataRequest = objectMapper.convertValue(
taskRequest.getData(), DataProcessingRequest.class);
handleDataProcessing(dataRequest);
}
private void handleSendNotification(TaskRequest taskRequest) throws Exception {
NotificationRequest notificationRequest = objectMapper.convertValue(
taskRequest.getData(), NotificationRequest.class);
handleNotification(notificationRequest);
}
private void handleCleanup(TaskRequest taskRequest) {
// Implementation for cleanup tasks
logger.info("Executing cleanup task");
}
private boolean shouldRetry(Exception e) {
// Determine if task should be retried based on exception type
return !(e instanceof IllegalArgumentException || 
e instanceof UnsupportedOperationException);
}
// DTO classes
public static class TaskRequest {
private String type;
private String id;
private Map<String, Object> data;
private int attempt;
// Getters and setters
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public Map<String, Object> getData() { return data; }
public void setData(Map<String, Object> data) { this.data = data; }
public int getAttempt() { return attempt; }
public void setAttempt(int attempt) { this.attempt = attempt; }
}
public static class EmailTaskRequest {
private String toEmail;
private String subject;
private String body;
private String templateId;
// Getters and setters
public String getToEmail() { return toEmail; }
public void setToEmail(String toEmail) { this.toEmail = toEmail; }
public String getSubject() { return subject; }
public void setSubject(String subject) { this.subject = subject; }
public String getBody() { return body; }
public void setBody(String body) { this.body = body; }
public String getTemplateId() { return templateId; }
public void setTemplateId(String templateId) { this.templateId = templateId; }
}
public static class DataProcessingRequest {
private String batchId;
private List<Map<String, Object>> data;
private Map<String, Object> options;
// Getters and setters
public String getBatchId() { return batchId; }
public void setBatchId(String batchId) { this.batchId = batchId; }
public List<Map<String, Object>> getData() { return data; }
public void setData(List<Map<String, Object>> data) { this.data = data; }
public Map<String, Object> getOptions() { return options; }
public void setOptions(Map<String, Object> options) { this.options = options; }
}
public static class NotificationRequest {
private String userId;
private String title;
private String message;
private Map<String, String> data;
// Getters and setters
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public String getTitle() { return title; }
public void setTitle(String title) { this.title = title; }
public String getMessage() { return message; }
public void setMessage(String message) { this.message = message; }
public Map<String, String> getData() { return data; }
public void setData(Map<String, String> data) { this.data = data; }
}
}

2. Service Implementations

import org.springframework.stereotype.Service;
import java.util.logging.Logger;
@Service
public class EmailService {
private static final Logger logger = Logger.getLogger(EmailService.class.getName());
public void sendEmail(String to, String subject, String body, String templateId) {
// Implementation for sending email
logger.info(String.format("Sending email to: %s, Subject: %s", to, subject));
// Simulate email sending
try {
Thread.sleep(1000); // Simulate processing time
logger.info("Email sent successfully to: " + to);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Email sending interrupted", e);
}
}
}
@Service
public class DataProcessingService {
private static final Logger logger = Logger.getLogger(DataProcessingService.class.getName());
public void processBatch(String batchId, List<Map<String, Object>> data, 
Map<String, Object> options) {
logger.info("Processing batch: " + batchId + " with " + data.size() + " records");
// Simulate data processing
for (Map<String, Object> record : data) {
processRecord(record, options);
}
logger.info("Batch processing completed: " + batchId);
}
private void processRecord(Map<String, Object> record, Map<String, Object> options) {
// Process individual record
try {
Thread.sleep(10); // Simulate processing time per record
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Data processing interrupted", e);
}
}
}
@Service
public class NotificationService {
private static final Logger logger = Logger.getLogger(NotificationService.class.getName());
public void sendPushNotification(String userId, String title, String message, 
Map<String, String> data) {
logger.info(String.format("Sending notification to user: %s, Title: %s", userId, title));
// Simulate notification sending
try {
Thread.sleep(500); // Simulate processing time
logger.info("Notification sent successfully to user: " + userId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Notification sending interrupted", e);
}
}
}

Advanced Patterns

1. Task Chaining

public class TaskChainingService {
private final TaskCreator taskCreator;
private final ObjectMapper objectMapper;
public TaskChainingService(TaskCreator taskCreator, ObjectMapper objectMapper) {
this.taskCreator = taskCreator;
this.objectMapper = objectMapper;
}
// Chain multiple tasks together
public void createProcessingPipeline(String initialQueue, String initialPayload) {
try {
// Step 1: Data validation
String validationTaskPayload = objectMapper.writeValueAsString(
new PipelineTask("VALIDATE_DATA", initialPayload, 1));
taskCreator.createHttpTask(initialQueue, 
"https://yourapp.com/tasks/validate", validationTaskPayload);
} catch (Exception e) {
throw new RuntimeException("Failed to create processing pipeline", e);
}
}
// Handler for chained tasks
public ResponseEntity<String> handleChainedTask(@RequestBody PipelineTask task) {
try {
switch (task.getStage()) {
case "VALIDATE_DATA":
return handleValidationStage(task);
case "PROCESS_DATA":
return handleProcessingStage(task);
case "NOTIFY_COMPLETION":
return handleNotificationStage(task);
default:
return ResponseEntity.badRequest().body("Unknown pipeline stage");
}
} catch (Exception e) {
// Log and potentially trigger compensation tasks
logger.severe("Pipeline stage failed: " + task.getStage());
return ResponseEntity.status(503).body("Stage failed");
}
}
private ResponseEntity<String> handleValidationStage(PipelineTask task) {
// Validate data
boolean isValid = validateData(task.getPayload());
if (isValid) {
// Proceed to next stage
task.setStage("PROCESS_DATA");
task.setSequence(task.getSequence() + 1);
String nextPayload = objectMapper.writeValueAsString(task);
taskCreator.createHttpTask("processing-queue", 
"https://yourapp.com/tasks/process", nextPayload);
return ResponseEntity.ok("Validation passed");
} else {
// Trigger compensation
triggerCompensationTask(task);
return ResponseEntity.badRequest().body("Validation failed");
}
}
private ResponseEntity<String> handleProcessingStage(PipelineTask task) {
// Process data
String processedData = processData(task.getPayload());
task.setPayload(processedData);
task.setStage("NOTIFY_COMPLETION");
task.setSequence(task.getSequence() + 1);
String nextPayload = objectMapper.writeValueAsString(task);
taskCreator.createHttpTask("notification-queue", 
"https://yourapp.com/tasks/notify", nextPayload);
return ResponseEntity.ok("Processing completed");
}
private ResponseEntity<String> handleNotificationStage(PipelineTask task) {
// Send completion notification
sendCompletionNotification(task.getPayload());
return ResponseEntity.ok("Pipeline completed");
}
private boolean validateData(String payload) {
// Implementation
return true;
}
private String processData(String payload) {
// Implementation
return payload;
}
private void sendCompletionNotification(String payload) {
// Implementation
}
private void triggerCompensationTask(PipelineTask task) {
// Create compensation task
try {
String compensationPayload = objectMapper.writeValueAsString(task);
taskCreator.createHttpTask("compensation-queue", 
"https://yourapp.com/tasks/compensate", compensationPayload);
} catch (Exception e) {
logger.severe("Failed to create compensation task: " + e.getMessage());
}
}
public static class PipelineTask {
private String stage;
private String payload;
private int sequence;
private String pipelineId;
public PipelineTask(String stage, String payload, int sequence) {
this.stage = stage;
this.payload = payload;
this.sequence = sequence;
this.pipelineId = UUID.randomUUID().toString();
}
// Getters and setters
public String getStage() { return stage; }
public void setStage(String stage) { this.stage = stage; }
public String getPayload() { return payload; }
public void setPayload(String payload) { this.payload = payload; }
public int getSequence() { return sequence; }
public void setSequence(int sequence) { this.sequence = sequence; }
public String getPipelineId() { return pipelineId; }
public void setPipelineId(String pipelineId) { this.pipelineId = pipelineId; }
}
}

2. Rate Limiting and Throttling

public class RateLimitedTaskService {
private final TaskCreator taskCreator;
private final Map<String, RateLimiter> rateLimiters;
public RateLimitedTaskService(TaskCreator taskCreator) {
this.taskCreator = taskCreator;
this.rateLimiters = new ConcurrentHashMap<>();
}
public Task createRateLimitedTask(String queueName, String url, String payload, 
String rateLimitKey, int maxPerMinute) {
RateLimiter limiter = rateLimiters.computeIfAbsent(rateLimitKey, 
k -> new RateLimiter(maxPerMinute, 60));
if (limiter.tryAcquire()) {
return taskCreator.createHttpTask(queueName, url, payload);
} else {
// Schedule for later execution
long delaySeconds = calculateRetryDelay(limiter);
return taskCreator.createDelayedTask(queueName, url, payload, delaySeconds);
}
}
private long calculateRetryDelay(RateLimiter limiter) {
// Calculate when the next token will be available
return Math.max(1, limiter.getNextAvailableTime() - System.currentTimeMillis()) / 1000;
}
// Simple rate limiter implementation
private static class RateLimiter {
private final int maxRequests;
private final long timeWindowMillis;
private final Queue<Long> requestTimes;
public RateLimiter(int maxRequests, long timeWindowSeconds) {
this.maxRequests = maxRequests;
this.timeWindowMillis = timeWindowSeconds * 1000;
this.requestTimes = new LinkedList<>();
}
public synchronized boolean tryAcquire() {
long now = System.currentTimeMillis();
// Remove expired requests
while (!requestTimes.isEmpty() && 
requestTimes.peek() < now - timeWindowMillis) {
requestTimes.poll();
}
if (requestTimes.size() < maxRequests) {
requestTimes.offer(now);
return true;
}
return false;
}
public synchronized long getNextAvailableTime() {
if (requestTimes.size() < maxRequests) {
return System.currentTimeMillis();
}
return requestTimes.peek() + timeWindowMillis;
}
}
}

Monitoring and Error Handling

1. Task Monitoring

public class TaskMonitor {
private final CloudTasksClient client;
private final String projectId;
private final String location;
private final MetricsService metricsService;
public TaskMonitor(CloudTasksClient client, String projectId, String location,
MetricsService metricsService) {
this.client = client;
this.projectId = projectId;
this.location = location;
this.metricsService = metricsService;
}
public void monitorQueues() {
List<Queue> queues = listAllQueues();
for (Queue queue : queues) {
monitorQueue(queue);
}
}
private void monitorQueue(Queue queue) {
String queueName = QueueName.parse(queue.getName()).getQueue();
QueueStats stats = queue.getStats();
if (stats != null) {
// Report metrics
metricsService.recordGauge("cloud_tasks.tasks_dispatched", 
stats.getDispatchedCount(), queueName);
metricsService.recordGauge("cloud_tasks.concurrent_dispatches", 
stats.getConcurrentDispatchesCount(), queueName);
metricsService.recordGauge("cloud_tasks.oldest_estimated_arrival_time", 
stats.getOldestEstimatedArrivalTime().getSeconds(), queueName);
}
// Check for stalled tasks
checkForStalledTasks(queueName);
}
private void checkForStalledTasks(String queueName) {
List<Task> tasks = listTasks(queueName);
long now = System.currentTimeMillis();
for (Task task : tasks) {
long taskAge = now - (task.getScheduleTime().getSeconds() * 1000);
if (taskAge > 3600000) { // 1 hour
logger.warning("Stalled task detected: " + task.getName());
metricsService.incrementCounter("cloud_tasks.stalled_tasks", queueName);
}
}
}
private List<Queue> listAllQueues() {
List<Queue> queues = new ArrayList<>();
LocationName parent = LocationName.of(projectId, location);
for (Queue queue : client.listQueues(parent).iterateAll()) {
queues.add(queue);
}
return queues;
}
private List<Task> listTasks(String queueName) {
List<Task> tasks = new ArrayList<>();
String queuePath = QueueName.of(projectId, location, queueName).toString();
for (Task task : client.listTasks(queuePath).iterateAll()) {
tasks.add(task);
}
return tasks;
}
}

Best Practices

  1. Use appropriate queue configurations based on workload
  2. Implement proper error handling in task handlers
  3. Use OIDC tokens for secure task execution
  4. Monitor queue metrics and set up alerts
  5. Implement retry logic with exponential backoff
  6. Use task names for deduplication when needed
  7. Test task handlers thoroughly
  8. Implement proper logging and tracing

Google Cloud Tasks provides a robust, scalable solution for managing asynchronous work in your applications, with built-in retry mechanisms, rate limiting, and security features.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper