Asynchronous Task Processing with Google Cloud Tasks in Java

Introduction

Google Cloud Tasks is a fully managed service that allows you to manage the execution, dispatch, and delivery of a large number of distributed tasks. It provides reliable, asynchronous task execution with built-in retry mechanisms, rate limiting, and seamless integration with other Google Cloud services.

This comprehensive guide explores Google Cloud Tasks implementation in Java for building robust, scalable job queue systems.


Architecture Overview

1. Core Components

  • Queue: Container for tasks with configurable rate limits and retry policies
  • Task: Individual unit of work with payload and scheduling options
  • Handler: Service endpoint that processes tasks
  • Dispatcher: Service that creates and manages tasks

2. Integration Patterns

  • HTTP Targets: Tasks invoke HTTP endpoints
  • App Engine Targets: Native App Engine integration
  • Pub/Sub Integration: Task completion triggers Pub/Sub messages
  • Cloud Functions: Serverless task handlers

Project Setup and Dependencies

1. Maven Dependencies

<dependencies>
<!-- Google Cloud Tasks -->
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-tasks</artifactId>
<version>2.36.0</version>
</dependency>
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<!-- Google Cloud Core -->
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-core</artifactId>
<version>2.36.0</version>
</dependency>
<!-- OAuth2 for Authentication -->
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
<version>1.23.0</version>
</dependency>
<!-- JSON Processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- Testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-tasks-test-support</artifactId>
<version>2.36.0</version>
<scope>test</scope>
</dependency>
</dependencies>

2. Application Configuration

# application.yml
google:
cloud:
project-id: ${GOOGLE_CLOUD_PROJECT:my-project}
tasks:
location: ${TASKS_LOCATION:us-central1}
queues:
email-queue: email-queue
image-processing-queue: image-processing-queue
data-export-queue: data-export-queue
notification-queue: notification-queue
spring:
application:
name: cloud-tasks-demo
jackson:
property-naming-strategy: SNAKE_CASE
server:
port: 8080
servlet:
context-path: /api
logging:
level:
com.google.cloud.tasks: INFO
com.example.tasks: DEBUG

Core Cloud Tasks Service

1. Cloud Tasks Configuration

@Configuration
@EnableConfigurationProperties(CloudTasksProperties.class)
@Slf4j
public class CloudTasksConfig {
private final CloudTasksProperties properties;
public CloudTasksConfig(CloudTasksProperties properties) {
this.properties = properties;
}
@Bean
@ConditionalOnMissingBean
public CloudTasksClient cloudTasksClient() {
try {
return CloudTasksClient.create();
} catch (IOException e) {
throw new RuntimeException("Failed to create Cloud Tasks client", e);
}
}
@Bean
public QueueService queueService(CloudTasksClient cloudTasksClient) {
return new QueueService(cloudTasksClient, properties);
}
@Bean
public TaskService taskService(CloudTasksClient cloudTasksClient, 
QueueService queueService,
ObjectMapper objectMapper) {
return new TaskService(cloudTasksClient, queueService, objectMapper, properties);
}
}
@ConfigurationProperties(prefix = "google.cloud.tasks")
@Data
public class CloudTasksProperties {
private String projectId;
private String location;
private Map<String, String> queues = new HashMap<>();
public String getQueuePath(String queueName) {
if (!queues.containsKey(queueName)) {
throw new IllegalArgumentException("Unknown queue: " + queueName);
}
String queueId = queues.get(queueName);
return QueueName.of(projectId, location, queueId).toString();
}
public String getQueueId(String queueName) {
if (!queues.containsKey(queueName)) {
throw new IllegalArgumentException("Unknown queue: " + queueName);
}
return queues.get(queueName);
}
}

2. Queue Management Service

@Service
@Slf4j
public class QueueService {
private final CloudTasksClient cloudTasksClient;
private final CloudTasksProperties properties;
public QueueService(CloudTasksClient cloudTasksClient, 
CloudTasksProperties properties) {
this.cloudTasksClient = cloudTasksClient;
this.properties = properties;
}
/**
* Create or update a queue with specified configuration
*/
public Queue createQueue(String queueName, QueueConfig config) {
try {
String queuePath = properties.getQueuePath(queueName);
Queue.Builder queueBuilder = Queue.newBuilder()
.setName(queuePath);
// Configure retry settings
if (config.getMaxAttempts() != null || config.getMaxRetryDuration() != null) {
RetryConfig.Builder retryBuilder = RetryConfig.newBuilder();
if (config.getMaxAttempts() != null) {
retryBuilder.setMaxAttempts(config.getMaxAttempts());
}
if (config.getMaxRetryDuration() != null) {
retryBuilder.setMaxRetryDuration(config.getMaxRetryDuration());
}
if (config.getMinBackoff() != null) {
retryBuilder.setMinBackoff(config.getMinBackoff());
}
if (config.getMaxBackoff() != null) {
retryBuilder.setMaxBackoff(config.getMaxBackoff());
}
if (config.getMaxDoublings() != null) {
retryBuilder.setMaxDoublings(config.getMaxDoublings());
}
queueBuilder.setRetryConfig(retryBuilder.build());
}
// Configure rate limits
if (config.getMaxDispatchesPerSecond() != null || config.getMaxConcurrentDispatches() != null) {
RateLimits.Builder rateLimitsBuilder = RateLimits.newBuilder();
if (config.getMaxDispatchesPerSecond() != null) {
rateLimitsBuilder.setMaxDispatchesPerSecond(config.getMaxDispatchesPerSecond());
}
if (config.getMaxConcurrentDispatches() != null) {
rateLimitsBuilder.setMaxConcurrentDispatches(config.getMaxConcurrentDispatches());
}
queueBuilder.setRateLimits(rateLimitsBuilder.build());
}
Queue queue = queueBuilder.build();
Queue createdQueue = cloudTasksClient.createQueue(
LocationName.of(properties.getProjectId(), properties.getLocation()),
queue
);
log.info("Created queue: {} with config: {}", queueName, config);
return createdQueue;
} catch (Exception e) {
log.error("Failed to create queue: {}", queueName, e);
throw new QueueCreationException("Failed to create queue: " + queueName, e);
}
}
/**
* Get queue information
*/
public Queue getQueue(String queueName) {
try {
String queuePath = properties.getQueuePath(queueName);
return cloudTasksClient.getQueue(queuePath);
} catch (Exception e) {
log.error("Failed to get queue: {}", queueName, e);
throw new QueueOperationException("Failed to get queue: " + queueName, e);
}
}
/**
* Delete a queue
*/
public void deleteQueue(String queueName) {
try {
String queuePath = properties.getQueuePath(queueName);
cloudTasksClient.deleteQueue(queuePath);
log.info("Deleted queue: {}", queueName);
} catch (Exception e) {
log.error("Failed to delete queue: {}", queueName, e);
throw new QueueOperationException("Failed to delete queue: " + queueName, e);
}
}
/**
* List all queues in the location
*/
public List<Queue> listQueues() {
try {
LocationName parent = LocationName.of(properties.getProjectId(), properties.getLocation());
CloudTasksClient.ListQueuesPagedResponse response = cloudTasksClient.listQueues(parent);
List<Queue> queues = new ArrayList<>();
for (Queue queue : response.iterateAll()) {
queues.add(queue);
}
return queues;
} catch (Exception e) {
log.error("Failed to list queues", e);
throw new QueueOperationException("Failed to list queues", e);
}
}
/**
* Purge all tasks from a queue
*/
public void purgeQueue(String queueName) {
try {
String queuePath = properties.getQueuePath(queueName);
cloudTasksClient.purgeQueue(queuePath);
log.info("Purged queue: {}", queueName);
} catch (Exception e) {
log.error("Failed to purge queue: {}", queueName, e);
throw new QueueOperationException("Failed to purge queue: " + queueName, e);
}
}
/**
* Pause a queue (stop task execution)
*/
public void pauseQueue(String queueName) {
try {
String queuePath = properties.getQueuePath(queueName);
cloudTasksClient.pauseQueue(queuePath);
log.info("Paused queue: {}", queueName);
} catch (Exception e) {
log.error("Failed to pause queue: {}", queueName, e);
throw new QueueOperationException("Failed to pause queue: " + queueName, e);
}
}
/**
* Resume a paused queue
*/
public void resumeQueue(String queueName) {
try {
String queuePath = properties.getQueuePath(queueName);
cloudTasksClient.resumeQueue(queuePath);
log.info("Resumed queue: {}", queueName);
} catch (Exception e) {
log.error("Failed to resume queue: {}", queueName, e);
throw new QueueOperationException("Failed to resume queue: " + queueName, e);
}
}
/**
* Get queue statistics
*/
public QueueStats getQueueStats(String queueName) {
try {
Queue queue = getQueue(queueName);
// Note: Cloud Tasks doesn't provide direct statistics in the API
// You would need to use Cloud Monitoring for detailed metrics
return QueueStats.builder()
.queueName(queueName)
.concurrentDispatchLimit(queue.getRateLimits().getMaxConcurrentDispatches())
.maxDispatchesPerSecond(queue.getRateLimits().getMaxDispatchesPerSecond())
.build();
} catch (Exception e) {
log.error("Failed to get queue stats: {}", queueName, e);
throw new QueueOperationException("Failed to get queue stats: " + queueName, e);
}
}
@Data
@Builder
public static class QueueConfig {
private Integer maxAttempts;
private Duration maxRetryDuration;
private Duration minBackoff;
private Duration maxBackoff;
private Integer maxDoublings;
private Double maxDispatchesPerSecond;
private Integer maxConcurrentDispatches;
}
@Data
@Builder
public static class QueueStats {
private String queueName;
private Integer concurrentDispatchLimit;
private Double maxDispatchesPerSecond;
private Long approximateTaskCount; // Would come from Cloud Monitoring
}
}

3. Task Management Service

@Service
@Slf4j
public class TaskService {
private final CloudTasksClient cloudTasksClient;
private final QueueService queueService;
private final ObjectMapper objectMapper;
private final CloudTasksProperties properties;
public TaskService(CloudTasksClient cloudTasksClient,
QueueService queueService,
ObjectMapper objectMapper,
CloudTasksProperties properties) {
this.cloudTasksClient = cloudTasksClient;
this.queueService = queueService;
this.objectMapper = objectMapper;
this.properties = properties;
}
/**
* Create a task with HTTP target
*/
public Task createHttpTask(String queueName, 
String url, 
Object payload, 
TaskOptions options) {
try {
String queuePath = properties.getQueuePath(queueName);
Task.Builder taskBuilder = Task.newBuilder();
// Set HTTP request
HttpRequest.Builder httpRequestBuilder = HttpRequest.newBuilder()
.setUrl(url)
.setHttpMethod(HttpMethod.POST);
// Add payload if provided
if (payload != null) {
String jsonPayload = objectMapper.writeValueAsString(payload);
httpRequestBuilder.setBody(ByteString.copyFromUtf8(jsonPayload));
httpRequestBuilder.putHeaders("Content-Type", "application/json");
}
// Add OIDC token for authentication if service account is configured
if (options.getServiceAccountEmail() != null) {
OidcToken oidcToken = OidcToken.newBuilder()
.setServiceAccountEmail(options.getServiceAccountEmail())
.build();
httpRequestBuilder.setOidcToken(oidcToken);
}
taskBuilder.setHttpRequest(httpRequestBuilder.build());
// Set schedule time
if (options.getScheduleTime() != null) {
taskBuilder.setScheduleTime(
Timestamp.newBuilder()
.setSeconds(options.getScheduleTime().getEpochSecond())
.setNanos(options.getScheduleTime().getNano())
.build()
);
}
// Set dispatch deadline
if (options.getDispatchDeadline() != null) {
taskBuilder.setDispatchDeadline(
Duration.newBuilder()
.setSeconds(options.getDispatchDeadline().getSeconds())
.setNanos(options.getDispatchDeadline().getNano())
.build()
);
}
// Set task name if provided
if (options.getTaskName() != null) {
String taskPath = TaskName.of(
properties.getProjectId(),
properties.getLocation(),
properties.getQueueId(queueName),
options.getTaskName()
).toString();
taskBuilder.setName(taskPath);
}
Task task = taskBuilder.build();
Task createdTask = cloudTasksClient.createTask(queuePath, task);
log.debug("Created task in queue: {}, task: {}", queueName, createdTask.getName());
return createdTask;
} catch (Exception e) {
log.error("Failed to create task in queue: {}", queueName, e);
throw new TaskCreationException("Failed to create task in queue: " + queueName, e);
}
}
/**
* Create a task with delayed execution
*/
public Task createDelayedTask(String queueName, 
String url, 
Object payload, 
Duration delay) {
TaskOptions options = TaskOptions.builder()
.scheduleTime(Instant.now().plus(delay))
.build();
return createHttpTask(queueName, url, payload, options);
}
/**
* Create multiple tasks in batch
*/
public List<Task> createBatchTasks(String queueName, 
List<BatchTaskRequest> batchRequests) {
try {
String queuePath = properties.getQueuePath(queueName);
List<Task> tasks = new ArrayList<>();
for (BatchTaskRequest request : batchRequests) {
Task task = buildTaskFromRequest(request);
tasks.add(task);
}
// Cloud Tasks doesn't have a batch create API, so we create sequentially
// In production, consider using async operations for better performance
List<Task> createdTasks = new ArrayList<>();
for (Task task : tasks) {
Task createdTask = cloudTasksClient.createTask(queuePath, task);
createdTasks.add(createdTask);
}
log.info("Created {} tasks in queue: {}", createdTasks.size(), queueName);
return createdTasks;
} catch (Exception e) {
log.error("Failed to create batch tasks in queue: {}", queueName, e);
throw new TaskCreationException("Failed to create batch tasks in queue: " + queueName, e);
}
}
/**
* Get task details
*/
public Task getTask(String queueName, String taskName) {
try {
String taskPath = TaskName.of(
properties.getProjectId(),
properties.getLocation(),
properties.getQueueId(queueName),
taskName
).toString();
return cloudTasksClient.getTask(taskPath);
} catch (Exception e) {
log.error("Failed to get task: {} from queue: {}", taskName, queueName, e);
throw new TaskOperationException("Failed to get task: " + taskName, e);
}
}
/**
* Delete a task
*/
public void deleteTask(String queueName, String taskName) {
try {
String taskPath = TaskName.of(
properties.getProjectId(),
properties.getLocation(),
properties.getQueueId(queueName),
taskName
).toString();
cloudTasksClient.deleteTask(taskPath);
log.debug("Deleted task: {} from queue: {}", taskName, queueName);
} catch (Exception e) {
log.error("Failed to delete task: {} from queue: {}", taskName, queueName, e);
throw new TaskOperationException("Failed to delete task: " + taskName, e);
}
}
/**
* List tasks in a queue
*/
public List<Task> listTasks(String queueName) {
try {
String queuePath = properties.getQueuePath(queueName);
CloudTasksClient.ListTasksPagedResponse response = cloudTasksClient.listTasks(queuePath);
List<Task> tasks = new ArrayList<>();
for (Task task : response.iterateAll()) {
tasks.add(task);
}
return tasks;
} catch (Exception e) {
log.error("Failed to list tasks in queue: {}", queueName, e);
throw new TaskOperationException("Failed to list tasks in queue: " + queueName, e);
}
}
/**
* Run a task immediately (bypass schedule)
*/
public void runTask(String queueName, String taskName) {
try {
String taskPath = TaskName.of(
properties.getProjectId(),
properties.getLocation(),
properties.getQueueId(queueName),
taskName
).toString();
cloudTasksClient.runTask(taskPath);
log.debug("Manually ran task: {} in queue: {}", taskName, queueName);
} catch (Exception e) {
log.error("Failed to run task: {} in queue: {}", taskName, queueName, e);
throw new TaskOperationException("Failed to run task: " + taskName, e);
}
}
private Task buildTaskFromRequest(BatchTaskRequest request) throws Exception {
Task.Builder taskBuilder = Task.newBuilder();
HttpRequest.Builder httpRequestBuilder = HttpRequest.newBuilder()
.setUrl(request.getUrl())
.setHttpMethod(HttpMethod.POST);
if (request.getPayload() != null) {
String jsonPayload = objectMapper.writeValueAsString(request.getPayload());
httpRequestBuilder.setBody(ByteString.copyFromUtf8(jsonPayload));
httpRequestBuilder.putHeaders("Content-Type", "application/json");
}
if (request.getServiceAccountEmail() != null) {
OidcToken oidcToken = OidcToken.newBuilder()
.setServiceAccountEmail(request.getServiceAccountEmail())
.build();
httpRequestBuilder.setOidcToken(oidcToken);
}
taskBuilder.setHttpRequest(httpRequestBuilder.build());
if (request.getScheduleTime() != null) {
taskBuilder.setScheduleTime(
Timestamp.newBuilder()
.setSeconds(request.getScheduleTime().getEpochSecond())
.setNanos(request.getScheduleTime().getNano())
.build()
);
}
if (request.getTaskName() != null) {
// For batch operations, task names should be generated or provided carefully
String taskPath = TaskName.of(
properties.getProjectId(),
properties.getLocation(),
properties.getQueueId(request.getQueueName()),
request.getTaskName()
).toString();
taskBuilder.setName(taskPath);
}
return taskBuilder.build();
}
@Data
@Builder
public static class TaskOptions {
private Instant scheduleTime;
private Duration dispatchDeadline;
private String serviceAccountEmail;
private String taskName;
}
@Data
@Builder
public static class BatchTaskRequest {
private String queueName;
private String url;
private Object payload;
private Instant scheduleTime;
private String serviceAccountEmail;
private String taskName;
}
}

Domain Models and Task Handlers

1. Task Payload Models

// Base task payload
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class TaskPayload {
private String taskId;
private String type;
private Map<String, Object> data;
private Instant createdAt;
private Integer attempt;
public static TaskPayload create(String type, Map<String, Object> data) {
return TaskPayload.builder()
.taskId(UUID.randomUUID().toString())
.type(type)
.data(data)
.createdAt(Instant.now())
.attempt(1)
.build();
}
}
// Email task payload
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class EmailTaskPayload {
private String to;
private String subject;
private String templateName;
private Map<String, Object> templateData;
private String replyTo;
private List<String> cc;
private List<String> bcc;
}
// Image processing task payload
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class ImageProcessingPayload {
private String imageUrl;
private String outputBucket;
private String outputPath;
private List<ImageOperation> operations;
private ImageFormat outputFormat;
private Integer quality;
public enum ImageFormat {
JPEG, PNG, WEBP, AVIF
}
@Data
@Builder
public static class ImageOperation {
private OperationType type;
private Map<String, Object> parameters;
public enum OperationType {
RESIZE, CROP, ROTATE, FILTER, WATERMARK, COMPRESS
}
}
}
// Data export task payload
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class DataExportPayload {
private String exportId;
private String userId;
private ExportFormat format;
private DateRange dateRange;
private List<String> fields;
private String outputBucket;
private String notificationEmail;
public enum ExportFormat {
CSV, JSON, EXCEL, PDF
}
@Data
@Builder
public static class DateRange {
private Instant startDate;
private Instant endDate;
}
}
// Notification task payload
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class NotificationPayload {
private String userId;
private NotificationType type;
private String title;
private String message;
private Map<String, Object> data;
private String deepLink;
private Priority priority;
public enum NotificationType {
INFO, WARNING, ERROR, SUCCESS, PROMOTIONAL
}
public enum Priority {
LOW, MEDIUM, HIGH, URGENT
}
}

2. Task Handler Controllers

@RestController
@RequestMapping("/tasks")
@Slf4j
public class TaskHandlerController {
private final EmailService emailService;
private final ImageProcessingService imageProcessingService;
private final DataExportService dataExportService;
private final NotificationService notificationService;
private final ObjectMapper objectMapper;
public TaskHandlerController(EmailService emailService,
ImageProcessingService imageProcessingService,
DataExportService dataExportService,
NotificationService notificationService,
ObjectMapper objectMapper) {
this.emailService = emailService;
this.imageProcessingService = imageProcessingService;
this.dataExportService = dataExportService;
this.notificationService = notificationService;
this.objectMapper = objectMapper;
}
/**
* Generic task handler that routes to specific handlers based on task type
*/
@PostMapping("/handle")
public ResponseEntity<TaskResponse> handleTask(@RequestBody TaskPayload payload) {
try {
log.info("Received task: {} - attempt: {}", payload.getType(), payload.getAttempt());
TaskResponse response = switch (payload.getType()) {
case "EMAIL" -> handleEmailTask(payload);
case "IMAGE_PROCESSING" -> handleImageProcessingTask(payload);
case "DATA_EXPORT" -> handleDataExportTask(payload);
case "NOTIFICATION" -> handleNotificationTask(payload);
default -> throw new UnsupportedTaskTypeException("Unsupported task type: " + payload.getType());
};
log.info("Completed task: {} successfully", payload.getTaskId());
return ResponseEntity.ok(response);
} catch (RetryableException e) {
log.warn("Task {} failed with retryable error: {}", payload.getTaskId(), e.getMessage());
return ResponseEntity.status(503) // Service Unavailable triggers retry
.body(TaskResponse.retry(payload.getTaskId(), e.getMessage()));
} catch (Exception e) {
log.error("Task {} failed with non-retryable error: {}", payload.getTaskId(), e.getMessage(), e);
return ResponseEntity.badRequest() // Bad Request stops retries
.body(TaskResponse.failure(payload.getTaskId(), e.getMessage()));
}
}
/**
* Email task handler
*/
@PostMapping("/email")
public ResponseEntity<TaskResponse> handleEmailTask(@RequestBody TaskPayload payload) {
try {
EmailTaskPayload emailPayload = objectMapper.convertValue(
payload.getData(), EmailTaskPayload.class);
emailService.sendEmail(
emailPayload.getTo(),
emailPayload.getSubject(),
emailPayload.getTemplateName(),
emailPayload.getTemplateData(),
emailPayload.getReplyTo(),
emailPayload.getCc(),
emailPayload.getBcc()
);
return ResponseEntity.ok(TaskResponse.success(payload.getTaskId(), "Email sent successfully"));
} catch (EmailServiceException e) {
if (e.isRetryable()) {
throw new RetryableException("Email service temporarily unavailable", e);
}
throw new NonRetryableException("Email sending failed permanently", e);
}
}
/**
* Image processing task handler
*/
@PostMapping("/image-processing")
public ResponseEntity<TaskResponse> handleImageProcessingTask(@RequestBody TaskPayload payload) {
try {
ImageProcessingPayload imagePayload = objectMapper.convertValue(
payload.getData(), ImageProcessingPayload.class);
String processedImageUrl = imageProcessingService.processImage(
imagePayload.getImageUrl(),
imagePayload.getOutputBucket(),
imagePayload.getOutputPath(),
imagePayload.getOperations(),
imagePayload.getOutputFormat(),
imagePayload.getQuality()
);
Map<String, Object> result = Map.of("processedImageUrl", processedImageUrl);
return ResponseEntity.ok(TaskResponse.success(payload.getTaskId(), result));
} catch (ImageProcessingException e) {
if (e.isRetryable()) {
throw new RetryableException("Image processing temporarily failed", e);
}
throw new NonRetryableException("Image processing failed permanently", e);
}
}
/**
* Data export task handler
*/
@PostMapping("/data-export")
public ResponseEntity<TaskResponse> handleDataExportTask(@RequestBody TaskPayload payload) {
try {
DataExportPayload exportPayload = objectMapper.convertValue(
payload.getData(), DataExportPayload.class);
String exportUrl = dataExportService.exportData(
exportPayload.getExportId(),
exportPayload.getUserId(),
exportPayload.getFormat(),
exportPayload.getDateRange(),
exportPayload.getFields(),
exportPayload.getOutputBucket()
);
// Send notification if email is provided
if (exportPayload.getNotificationEmail() != null) {
emailService.sendExportCompletionNotification(
exportPayload.getNotificationEmail(),
exportUrl,
exportPayload.getFormat()
);
}
Map<String, Object> result = Map.of("exportUrl", exportUrl);
return ResponseEntity.ok(TaskResponse.success(payload.getTaskId(), result));
} catch (DataExportException e) {
if (e.isRetryable()) {
throw new RetryableException("Data export temporarily failed", e);
}
throw new NonRetryableException("Data export failed permanently", e);
}
}
/**
* Notification task handler
*/
@PostMapping("/notification")
public ResponseEntity<TaskResponse> handleNotificationTask(@RequestBody TaskPayload payload) {
try {
NotificationPayload notificationPayload = objectMapper.convertValue(
payload.getData(), NotificationPayload.class);
notificationService.sendNotification(
notificationPayload.getUserId(),
notificationPayload.getType(),
notificationPayload.getTitle(),
notificationPayload.getMessage(),
notificationPayload.getData(),
notificationPayload.getDeepLink(),
notificationPayload.getPriority()
);
return ResponseEntity.ok(TaskResponse.success(payload.getTaskId(), "Notification sent"));
} catch (NotificationException e) {
if (e.isRetryable()) {
throw new RetryableException("Notification service temporarily unavailable", e);
}
throw new NonRetryableException("Notification sending failed permanently", e);
}
}
@Data
@Builder
public static class TaskResponse {
private String taskId;
private String status;
private String message;
private Map<String, Object> result;
private Instant processedAt;
public static TaskResponse success(String taskId, Object result) {
return TaskResponse.builder()
.taskId(taskId)
.status("SUCCESS")
.result(result instanceof Map ? (Map<String, Object>) result : Map.of("result", result))
.processedAt(Instant.now())
.build();
}
public static TaskResponse success(String taskId, String message) {
return TaskResponse.builder()
.taskId(taskId)
.status("SUCCESS")
.message(message)
.processedAt(Instant.now())
.build();
}
public static TaskResponse retry(String taskId, String message) {
return TaskResponse.builder()
.taskId(taskId)
.status("RETRY")
.message(message)
.processedAt(Instant.now())
.build();
}
public static TaskResponse failure(String taskId, String message) {
return TaskResponse.builder()
.taskId(taskId)
.status("FAILED")
.message(message)
.processedAt(Instant.now())
.build();
}
}
}

Task Dispatcher Services

1. Task Dispatcher Service

@Service
@Slf4j
public class TaskDispatcherService {
private final TaskService taskService;
private final ObjectMapper objectMapper;
public TaskDispatcherService(TaskService taskService, 
ObjectMapper objectMapper) {
this.taskService = taskService;
this.objectMapper = objectMapper;
}
/**
* Dispatch email task
*/
public Task dispatchEmailTask(EmailTaskPayload emailPayload, 
Duration delay) {
try {
String taskHandlerUrl = "/api/tasks/email";
TaskPayload taskPayload = TaskPayload.create("EMAIL", 
objectMapper.convertValue(emailPayload, Map.class));
TaskService.TaskOptions options = TaskService.TaskOptions.builder()
.scheduleTime(delay != null ? Instant.now().plus(delay) : null)
.build();
return taskService.createHttpTask("email-queue", taskHandlerUrl, taskPayload, options);
} catch (Exception e) {
log.error("Failed to dispatch email task", e);
throw new TaskDispatchException("Failed to dispatch email task", e);
}
}
/**
* Dispatch image processing task
*/
public Task dispatchImageProcessingTask(ImageProcessingPayload imagePayload,
Duration delay) {
try {
String taskHandlerUrl = "/api/tasks/image-processing";
TaskPayload taskPayload = TaskPayload.create("IMAGE_PROCESSING",
objectMapper.convertValue(imagePayload, Map.class));
TaskService.TaskOptions options = TaskService.TaskOptions.builder()
.scheduleTime(delay != null ? Instant.now().plus(delay) : null)
.dispatchDeadline(Duration.ofMinutes(10)) // Image processing can take time
.build();
return taskService.createHttpTask("image-processing-queue", taskHandlerUrl, taskPayload, options);
} catch (Exception e) {
log.error("Failed to dispatch image processing task", e);
throw new TaskDispatchException("Failed to dispatch image processing task", e);
}
}
/**
* Dispatch data export task
*/
public Task dispatchDataExportTask(DataExportPayload exportPayload,
Duration delay) {
try {
String taskHandlerUrl = "/api/tasks/data-export";
TaskPayload taskPayload = TaskPayload.create("DATA_EXPORT",
objectMapper.convertValue(exportPayload, Map.class));
TaskService.TaskOptions options = TaskService.TaskOptions.builder()
.scheduleTime(delay != null ? Instant.now().plus(delay) : null)
.dispatchDeadline(Duration.ofMinutes(30)) // Data export can take significant time
.build();
return taskService.createHttpTask("data-export-queue", taskHandlerUrl, taskPayload, options);
} catch (Exception e) {
log.error("Failed to dispatch data export task", e);
throw new TaskDispatchException("Failed to dispatch data export task", e);
}
}
/**
* Dispatch notification task
*/
public Task dispatchNotificationTask(NotificationPayload notificationPayload,
Duration delay) {
try {
String taskHandlerUrl = "/api/tasks/notification";
TaskPayload taskPayload = TaskPayload.create("NOTIFICATION",
objectMapper.convertValue(notificationPayload, Map.class));
TaskService.TaskOptions options = TaskService.TaskOptions.builder()
.scheduleTime(delay != null ? Instant.now().plus(delay) : null)
.build();
return taskService.createHttpTask("notification-queue", taskHandlerUrl, taskPayload, options);
} catch (Exception e) {
log.error("Failed to dispatch notification task", e);
throw new TaskDispatchException("Failed to dispatch notification task", e);
}
}
/**
* Dispatch batch email tasks
*/
public List<Task> dispatchBatchEmailTasks(List<EmailTaskPayload> emailPayloads) {
try {
String taskHandlerUrl = "/api/tasks/email";
List<TaskService.BatchTaskRequest> batchRequests = new ArrayList<>();
for (EmailTaskPayload emailPayload : emailPayloads) {
TaskPayload taskPayload = TaskPayload.create("EMAIL",
objectMapper.convertValue(emailPayload, Map.class));
TaskService.BatchTaskRequest request = TaskService.BatchTaskRequest.builder()
.queueName("email-queue")
.url(taskHandlerUrl)
.payload(taskPayload)
.build();
batchRequests.add(request);
}
return taskService.createBatchTasks("email-queue", batchRequests);
} catch (Exception e) {
log.error("Failed to dispatch batch email tasks", e);
throw new TaskDispatchException("Failed to dispatch batch email tasks", e);
}
}
/**
* Dispatch task with custom configuration
*/
public Task dispatchCustomTask(String queueName,
String taskType,
Object payload,
TaskService.TaskOptions options) {
try {
String taskHandlerUrl = "/api/tasks/handle";
TaskPayload taskPayload = TaskPayload.create(taskType,
objectMapper.convertValue(payload, Map.class));
return taskService.createHttpTask(queueName, taskHandlerUrl, taskPayload, options);
} catch (Exception e) {
log.error("Failed to dispatch custom task of type: {}", taskType, e);
throw new TaskDispatchException("Failed to dispatch custom task: " + taskType, e);
}
}
}

2. Business Service Integration

@Service
@Slf4j
public class OrderProcessingService {
private final TaskDispatcherService taskDispatcher;
private final EmailService emailService;
public OrderProcessingService(TaskDispatcherService taskDispatcher,
EmailService emailService) {
this.taskDispatcher = taskDispatcher;
this.emailService = emailService;
}
/**
* Process order and dispatch related tasks
*/
public void processOrder(Order order) {
try {
// Dispatch order confirmation email
dispatchOrderConfirmationEmail(order);
// Dispatch inventory update task
dispatchInventoryUpdateTask(order);
// Dispatch shipping notification (delayed)
dispatchShippingNotificationTask(order);
log.info("Dispatched order processing tasks for order: {}", order.getId());
} catch (Exception e) {
log.error("Failed to process order: {}", order.getId(), e);
throw new OrderProcessingException("Failed to process order: " + order.getId(), e);
}
}
private void dispatchOrderConfirmationEmail(Order order) {
EmailTaskPayload emailPayload = EmailTaskPayload.builder()
.to(order.getCustomerEmail())
.subject("Order Confirmation - #" + order.getId())
.templateName("order-confirmation")
.templateData(createOrderTemplateData(order))
.build();
taskDispatcher.dispatchEmailTask(emailPayload, null);
}
private void dispatchInventoryUpdateTask(Order order) {
// Implementation for inventory update task
log.debug("Dispatching inventory update for order: {}", order.getId());
}
private void dispatchShippingNotificationTask(Order order) {
NotificationPayload notificationPayload = NotificationPayload.builder()
.userId(order.getCustomerId())
.type(NotificationPayload.NotificationType.INFO)
.title("Order Shipped")
.message("Your order #" + order.getId() + " has been shipped")
.deepLink("/orders/" + order.getId())
.priority(NotificationPayload.Priority.MEDIUM)
.build();
// Schedule notification for estimated shipping time (e.g., 2 hours from now)
taskDispatcher.dispatchNotificationTask(notificationPayload, Duration.ofHours(2));
}
private Map<String, Object> createOrderTemplateData(Order order) {
return Map.of(
"orderId", order.getId(),
"customerName", order.getCustomerName(),
"orderDate", order.getOrderDate(),
"items", order.getItems(),
"totalAmount", order.getTotalAmount(),
"shippingAddress", order.getShippingAddress()
);
}
}
// Example domain model
@Data
@Builder
public class Order {
private String id;
private String customerId;
private String customerEmail;
private String customerName;
private Instant orderDate;
private List<OrderItem> items;
private BigDecimal totalAmount;
private Address shippingAddress;
}
@Data
@Builder
public class OrderItem {
private String productId;
private String productName;
private Integer quantity;
private BigDecimal unitPrice;
}
@Data
@Builder
public class Address {
private String street;
private String city;
private String state;
private String zipCode;
private String country;
}

Error Handling and Monitoring

1. Custom Exceptions

public class QueueCreationException extends RuntimeException {
public QueueCreationException(String message) {
super(message);
}
public QueueCreationException(String message, Throwable cause) {
super(message, cause);
}
}
public class QueueOperationException extends RuntimeException {
public QueueOperationException(String message) {
super(message);
}
public QueueOperationException(String message, Throwable cause) {
super(message, cause);
}
}
public class TaskCreationException extends RuntimeException {
public TaskCreationException(String message) {
super(message);
}
public TaskCreationException(String message, Throwable cause) {
super(message, cause);
}
}
public class TaskOperationException extends RuntimeException {
public TaskOperationException(String message) {
super(message);
}
public TaskOperationException(String message, Throwable cause) {
super(message, cause);
}
}
public class TaskDispatchException extends RuntimeException {
public TaskDispatchException(String message) {
super(message);
}
public TaskDispatchException(String message, Throwable cause) {
super(message, cause);
}
}
public class UnsupportedTaskTypeException extends RuntimeException {
public UnsupportedTaskTypeException(String message) {
super(message);
}
}
public class RetryableException extends RuntimeException {
public RetryableException(String message) {
super(message);
}
public RetryableException(String message, Throwable cause) {
super(message, cause);
}
}
public class NonRetryableException extends RuntimeException {
public NonRetryableException(String message) {
super(message);
}
public NonRetryableException(String message, Throwable cause) {
super(message, cause);
}
}
// Service-specific exceptions
public class EmailServiceException extends RuntimeException {
private final boolean retryable;
public EmailServiceException(String message, boolean retryable) {
super(message);
this.retryable = retryable;
}
public EmailServiceException(String message, Throwable cause, boolean retryable) {
super(message, cause);
this.retryable = retryable;
}
public boolean isRetryable() {
return retryable;
}
}
public class ImageProcessingException extends RuntimeException {
private final boolean retryable;
public ImageProcessingException(String message, boolean retryable) {
super(message);
this.retryable = retryable;
}
public boolean isRetryable() {
return retryable;
}
}
public class DataExportException extends RuntimeException {
private final boolean retryable;
public DataExportException(String message, boolean retryable) {
super(message);
this.retryable = retryable;
}
public boolean isRetryable() {
return retryable;
}
}
public class NotificationException extends RuntimeException {
private final boolean retryable;
public NotificationException(String message, boolean retryable) {
super(message);
this.retryable = retryable;
}
public boolean isRetryable() {
return retryable;
}
}
public class OrderProcessingException extends RuntimeException {
public OrderProcessingException(String message) {
super(message);
}
public OrderProcessingException(String message, Throwable cause) {
super(message, cause);
}
}

2. Monitoring and Metrics

@Component
@Slf4j
public class TaskMetricsMonitor {
private final MeterRegistry meterRegistry;
private final Counter taskDispatchedCounter;
private final Counter taskCompletedCounter;
private final Counter taskFailedCounter;
private final Timer taskProcessingTimer;
public TaskMetricsMonitor(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.taskDispatchedCounter = meterRegistry.counter("tasks.dispatched");
this.taskCompletedCounter = meterRegistry.counter("tasks.completed");
this.taskFailedCounter = meterRegistry.counter("tasks.failed");
this.taskProcessingTimer = meterRegistry.timer("tasks.processing.time");
}
public void recordTaskDispatched(String queueName, String taskType) {
taskDispatchedCounter.increment();
Tags tags = Tags.of(
Tag.of("queue", queueName),
Tag.of("type", taskType)
);
meterRegistry.counter("tasks.dispatched.detailed", tags).increment();
log.debug("Task dispatched - queue: {}, type: {}", queueName, taskType);
}
public void recordTaskCompleted(String queueName, String taskType, Duration processingTime) {
taskCompletedCounter.increment();
taskProcessingTimer.record(processingTime);
Tags tags = Tags.of(
Tag.of("queue", queueName),
Tag.of("type", taskType)
);
meterRegistry.counter("tasks.completed.detailed", tags).increment();
meterRegistry.timer("tasks.processing.time.detailed", tags).record(processingTime);
log.debug("Task completed - queue: {}, type: {}, time: {}ms", 
queueName, taskType, processingTime.toMillis());
}
public void recordTaskFailed(String queueName, String taskType, String errorType) {
taskFailedCounter.increment();
Tags tags = Tags.of(
Tag.of("queue", queueName),
Tag.of("type", taskType),
Tag.of("error_type", errorType)
);
meterRegistry.counter("tasks.failed.detailed", tags).increment();
log.warn("Task failed - queue: {}, type: {}, error: {}", queueName, taskType, errorType);
}
public void recordQueueOperation(String operation, String queueName, boolean success) {
Tags tags = Tags.of(
Tag.of("operation", operation),
Tag.of("queue", queueName),
Tag.of("success", String.valueOf(success))
);
meterRegistry.counter("queue.operations", tags).increment();
}
}

Testing Implementation

1. Unit Tests

@ExtendWith(MockitoExtension.class)
class TaskDispatcherServiceTest {
@Mock
private TaskService taskService;
@Mock
private ObjectMapper objectMapper;
@InjectMocks
private TaskDispatcherService taskDispatcherService;
@Test
void testDispatchEmailTask() throws Exception {
// Given
EmailTaskPayload emailPayload = EmailTaskPayload.builder()
.to("[email protected]")
.subject("Test Email")
.build();
TaskPayload expectedTaskPayload = TaskPayload.create("EMAIL", new HashMap<>());
Task expectedTask = Task.newBuilder().setName("test-task").build();
when(objectMapper.convertValue(emailPayload, Map.class)).thenReturn(new HashMap<>());
when(taskService.createHttpTask(anyString(), anyString(), any(TaskPayload.class), any()))
.thenReturn(expectedTask);
// When
Task result = taskDispatcherService.dispatchEmailTask(emailPayload, null);
// Then
assertNotNull(result);
assertEquals(expectedTask, result);
verify(taskService).createHttpTask(eq("email-queue"), anyString(), any(TaskPayload.class), any());
}
@Test
void testDispatchEmailTaskWithDelay() throws Exception {
// Given
EmailTaskPayload emailPayload = EmailTaskPayload.builder()
.to("[email protected]")
.subject("Test Email")
.build();
Duration delay = Duration.ofMinutes(5);
when(objectMapper.convertValue(emailPayload, Map.class)).thenReturn(new HashMap<>());
when(taskService.createHttpTask(anyString(), anyString(), any(TaskPayload.class), any()))
.thenReturn(Task.newBuilder().build());
// When
taskDispatcherService.dispatchEmailTask(emailPayload, delay);
// Then
verify(taskService).createHttpTask(anyString(), anyString(), any(TaskPayload.class), 
argThat(options -> options.getScheduleTime() != null));
}
}
@SpringBootTest
@ActiveProfiles("test")
class TaskHandlerControllerIntegrationTest {
@Autowired
private TestRestTemplate restTemplate;
@MockBean
private EmailService emailService;
@Test
void testHandleEmailTaskSuccess() {
// Given
TaskPayload taskPayload = TaskPayload.builder()
.taskId("test-task-123")
.type("EMAIL")
.data(Map.of(
"to", "[email protected]",
"subject", "Test Subject"
))
.build();
// When
ResponseEntity<TaskHandlerController.TaskResponse> response = restTemplate.postForEntity(
"/api/tasks/email",
taskPayload,
TaskHandlerController.TaskResponse.class
);
// Then
assertEquals(HttpStatus.OK, response.getStatusCode());
assertNotNull(response.getBody());
assertEquals("SUCCESS", response.getBody().getStatus());
}
@Test
void testHandleEmailTaskRetryableError() {
// Given
TaskPayload taskPayload = TaskPayload.builder()
.taskId("test-task-456")
.type("EMAIL")
.data(new HashMap<>())
.build();
doThrow(new EmailServiceException("Service unavailable", true))
.when(emailService).sendEmail(anyString(), anyString(), anyString(), any(), any(), any(), any());
// When
ResponseEntity<TaskHandlerController.TaskResponse> response = restTemplate.postForEntity(
"/api/tasks/email",
taskPayload,
TaskHandlerController.TaskResponse.class
);
// Then
assertEquals(HttpStatus.SERVICE_UNAVAILABLE, response.getStatusCode());
assertNotNull(response.getBody());
assertEquals("RETRY", response.getBody().getStatus());
}
}

Configuration for Production

1. Production Configuration

# application-prod.yml
google:
cloud:
project-id: ${GOOGLE_CLOUD_PROJECT}
tasks:
location: us-central1
queues:
email-queue: email-queue-prod
image-processing-queue: image-processing-prod
data-export-queue: data-export-prod
notification-queue: notification-prod
management:
endpoints:
web:
exposure:
include: health,metrics,info,prometheus
endpoint:
health:
show-details: always
show-components: always
metrics:
export:
prometheus:
enabled: true
distribution:
percentiles-histogram:
http.server.requests: true
logging:
level:
com.google.cloud.tasks: WARN
com.example.tasks: INFO
pattern:
level: "%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]"
server:
port: 8080
forward-headers-strategy: framework

2. Queue Configuration for Different Workloads

@Configuration
@Slf4j
public class QueueConfiguration {
@Bean
public CommandLineRunner initializeQueues(QueueService queueService) {
return args -> {
// Email queue - high throughput, fast processing
QueueService.QueueConfig emailQueueConfig = QueueService.QueueConfig.builder()
.maxAttempts(5)
.maxRetryDuration(Duration.ofHours(1))
.minBackoff(Duration.ofSeconds(10))
.maxBackoff(Duration.ofMinutes(10))
.maxDispatchesPerSecond(50.0)
.maxConcurrentDispatches(10)
.build();
// Image processing queue - longer tasks, lower concurrency
QueueService.QueueConfig imageQueueConfig = QueueService.QueueConfig.builder()
.maxAttempts(3)
.maxRetryDuration(Duration.ofHours(6))
.minBackoff(Duration.ofMinutes(1))
.maxBackoff(Duration.ofHours(1))
.maxDispatchesPerSecond(5.0)
.maxConcurrentDispatches(3)
.build();
// Data export queue - very long tasks, low concurrency
QueueService.QueueConfig exportQueueConfig = QueueService.QueueConfig.builder()
.maxAttempts(2)
.maxRetryDuration(Duration.ofHours(24))
.minBackoff(Duration.ofMinutes(5))
.maxBackoff(Duration.ofHours(2))
.maxDispatchesPerSecond(1.0)
.maxConcurrentDispatches(2)
.build();
try {
queueService.createQueue("email-queue", emailQueueConfig);
queueService.createQueue("image-processing-queue", imageQueueConfig);
queueService.createQueue("data-export-queue", exportQueueConfig);
log.info("Initialized Cloud Tasks queues");
} catch (Exception e) {
log.warn("Failed to initialize queues (they might already exist)", e);
}
};
}
}

Conclusion

Google Cloud Tasks provides a robust, fully managed solution for asynchronous task processing in Java applications. Key benefits and best practices:

Architecture Benefits:

  • Fully Managed: No infrastructure management required
  • Reliable Delivery: Built-in retry mechanisms and dead-letter queuing
  • Rate Limiting: Configurable rate limits and concurrency control
  • Seamless Integration: Native integration with Google Cloud services

Implementation Patterns:

  • HTTP Task Handlers: REST endpoints for task processing
  • Delayed Execution: Schedule tasks for future execution
  • Batch Processing: Efficient handling of multiple related tasks
  • Error Handling: Differentiate between retryable and non-retryable errors

Best Practices:

  • Queue Configuration: Tailor retry policies and rate limits to workload characteristics
  • Idempotent Handlers: Design task handlers to handle duplicate executions safely
  • Monitoring: Implement comprehensive metrics and logging
  • Security: Use OIDC tokens for secure task handler authentication
  • Error Classification: Properly distinguish between transient and permanent failures

Use Cases:

  • Email Sending: Asynchronous email delivery with retry logic
  • Image Processing: Background processing of images and media files
  • Data Export: Generation of large reports and data exports
  • Notification Delivery: Push notifications and user alerts
  • Order Processing: Complex order fulfillment workflows

By leveraging Google Cloud Tasks with Java, you can build highly scalable, reliable asynchronous processing systems that handle varying workloads efficiently while maintaining simplicity in management and operation.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper