Hangfire Alternative in Java: Complete Background Job Processing

Introduction to Java Background Job Processing

While Hangfire is a popular .NET background job framework, Java has several robust alternatives for scheduling, processing, and managing background jobs. This guide covers comprehensive Java solutions for background job processing similar to Hangfire.

Table of Contents

  1. Quartz Scheduler - Enterprise Solution
  2. Spring Batch - Batch Processing Framework
  3. JobRunr - Modern Hangfire Alternative
  4. Custom Implementation with Spring
  5. Distributed Job Processing
  6. Monitoring and Management
  7. Performance Optimization
  8. Real-World Examples

1. Quartz Scheduler - Enterprise Solution

Maven Dependencies

<properties>
<quartz.version>2.3.2</quartz.version>
<spring-boot.version>3.1.0</spring-boot.version>
</properties>
<dependencies>
<!-- Quartz Scheduler -->
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>${quartz.version}</version>
</dependency>
<!-- Spring Boot Quartz Integration -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<!-- Database for Job Store -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<!-- For clustering -->
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz-jobs</artifactId>
<version>${quartz.version}</version>
</dependency>
</dependencies>

Quartz Configuration

package com.example.jobs.config;
import org.quartz.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
import java.util.Properties;
@Configuration
public class QuartzConfig {
@Bean
public JobDetail sampleJobDetail() {
return JobBuilder.newJob(SampleJob.class)
.withIdentity("sampleJob", "backgroundJobs")
.withDescription("Sample background job")
.storeDurably()
.build();
}
@Bean
public Trigger sampleJobTrigger() {
return TriggerBuilder.newTrigger()
.forJob(sampleJobDetail())
.withIdentity("sampleTrigger", "backgroundJobs")
.withDescription("Sample job trigger")
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(30)
.repeatForever())
.build();
}
@Bean
public JobDetail emailJobDetail() {
return JobBuilder.newJob(EmailJob.class)
.withIdentity("emailJob", "backgroundJobs")
.withDescription("Email sending job")
.storeDurably()
.build();
}
@Bean
public Trigger emailJobTrigger() {
return TriggerBuilder.newTrigger()
.forJob(emailJobDetail())
.withIdentity("emailTrigger", "backgroundJobs")
.withDescription("Email job trigger")
.withSchedule(CronScheduleBuilder.cronSchedule("0 0 9 * * ?")) // Daily at 9 AM
.build();
}
// JDBC JobStore configuration for persistence
@Bean
public Properties quartzProperties(DataSource dataSource) {
Properties properties = new Properties();
// Configure JobStore
properties.setProperty("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX");
properties.setProperty("org.quartz.jobStore.driverDelegateClass", "org.quartz.impl.jdbcjobstore.StdJDBCDelegate");
properties.setProperty("org.quartz.jobStore.dataSource", "quartzDataSource");
properties.setProperty("org.quartz.jobStore.tablePrefix", "QRTZ_");
properties.setProperty("org.quartz.jobStore.isClustered", "true");
properties.setProperty("org.quartz.jobStore.clusterCheckinInterval", "20000");
// Configure ThreadPool
properties.setProperty("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
properties.setProperty("org.quartz.threadPool.threadCount", "10");
properties.setProperty("org.quartz.threadPool.threadPriority", "5");
// DataSource configuration
properties.setProperty("org.quartz.dataSource.quartzDataSource.driver", "org.h2.Driver");
properties.setProperty("org.quartz.dataSource.quartzDataSource.URL", "jdbc:h2:mem:quartz");
properties.setProperty("org.quartz.dataSource.quartzDataSource.user", "sa");
properties.setProperty("org.quartz.dataSource.quartzDataSource.password", "");
return properties;
}
}

Quartz Job Implementations

package com.example.jobs.jobs;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public class SampleJob implements Job {
private static final Logger logger = LoggerFactory.getLogger(SampleJob.class);
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
try {
logger.info("Executing sample background job: {}", context.getJobDetail().getKey());
// Simulate work
Thread.sleep(2000);
logger.info("Sample job completed successfully");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new JobExecutionException("Job interrupted", e);
} catch (Exception e) {
logger.error("Sample job failed", e);
throw new JobExecutionException("Job execution failed", e);
}
}
}
@Component
public class EmailJob implements Job {
private static final Logger logger = LoggerFactory.getLogger(EmailJob.class);
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
try {
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
String recipient = jobDataMap.getString("recipient");
String subject = jobDataMap.getString("subject");
logger.info("Sending email to: {} with subject: {}", recipient, subject);
// Email sending logic
sendEmail(recipient, subject, jobDataMap);
logger.info("Email sent successfully to: {}", recipient);
} catch (Exception e) {
logger.error("Email job failed", e);
throw new JobExecutionException("Email job execution failed", e);
}
}
private void sendEmail(String recipient, String subject, JobDataMap dataMap) {
// Implement email sending logic
// This could use JavaMail, Spring Mail, or other email services
}
}
@Component
public class DataProcessingJob implements Job {
private static final Logger logger = LoggerFactory.getLogger(DataProcessingJob.class);
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
try {
JobDataMap jobDataMap = context.getMergedJobDataMap();
String filePath = jobDataMap.getString("filePath");
String processType = jobDataMap.getString("processType", "DEFAULT");
logger.info("Starting data processing job for file: {}", filePath);
// Process data based on type
switch (processType) {
case "CSV_IMPORT":
processCsvFile(filePath);
break;
case "DATA_CLEANING":
cleanData(filePath);
break;
case "REPORT_GENERATION":
generateReport(filePath);
break;
default:
processFile(filePath);
}
logger.info("Data processing job completed for file: {}", filePath);
} catch (Exception e) {
logger.error("Data processing job failed", e);
throw new JobExecutionException("Data processing job failed", e);
}
}
private void processCsvFile(String filePath) {
// CSV processing logic
logger.info("Processing CSV file: {}", filePath);
}
private void cleanData(String filePath) {
// Data cleaning logic
logger.info("Cleaning data from file: {}", filePath);
}
private void generateReport(String filePath) {
// Report generation logic
logger.info("Generating report from file: {}", filePath);
}
private void processFile(String filePath) {
// Default file processing
logger.info("Processing file: {}", filePath);
}
}

Quartz Job Management Service

package com.example.jobs.service;
import org.quartz.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.UUID;
@Service
public class QuartzJobService {
private static final Logger logger = LoggerFactory.getLogger(QuartzJobService.class);
private final Scheduler scheduler;
public QuartzJobService(Scheduler scheduler) {
this.scheduler = scheduler;
}
// Schedule a one-time job
public String scheduleOneTimeJob(Class<? extends Job> jobClass, JobDataMap jobDataMap, 
LocalDateTime scheduleTime) {
try {
String jobId = UUID.randomUUID().toString();
JobDetail jobDetail = JobBuilder.newJob(jobClass)
.withIdentity(jobId, "scheduledJobs")
.usingJobData(jobDataMap)
.storeDurably()
.build();
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("trigger_" + jobId, "scheduledJobs")
.startAt(Date.from(scheduleTime.atZone(ZoneId.systemDefault()).toInstant()))
.build();
scheduler.scheduleJob(jobDetail, trigger);
logger.info("Scheduled one-time job: {} for {}", jobId, scheduleTime);
return jobId;
} catch (SchedulerException e) {
logger.error("Failed to schedule one-time job", e);
throw new JobSchedulingException("Failed to schedule job", e);
}
}
// Schedule recurring job
public String scheduleRecurringJob(Class<? extends Job> jobClass, JobDataMap jobDataMap, 
String cronExpression) {
try {
String jobId = UUID.randomUUID().toString();
JobDetail jobDetail = JobBuilder.newJob(jobClass)
.withIdentity(jobId, "recurringJobs")
.usingJobData(jobDataMap)
.storeDurably()
.build();
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("trigger_" + jobId, "recurringJobs")
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
.build();
scheduler.scheduleJob(jobDetail, trigger);
logger.info("Scheduled recurring job: {} with cron: {}", jobId, cronExpression);
return jobId;
} catch (SchedulerException e) {
logger.error("Failed to schedule recurring job", e);
throw new JobSchedulingException("Failed to schedule recurring job", e);
}
}
// Schedule job with delay
public String scheduleDelayedJob(Class<? extends Job> jobClass, JobDataMap jobDataMap, 
long delayInSeconds) {
LocalDateTime scheduleTime = LocalDateTime.now().plusSeconds(delayInSeconds);
return scheduleOneTimeJob(jobClass, jobDataMap, scheduleTime);
}
// Pause job
public void pauseJob(String jobId, String group) {
try {
JobKey jobKey = new JobKey(jobId, group);
if (scheduler.checkExists(jobKey)) {
scheduler.pauseJob(jobKey);
logger.info("Paused job: {}", jobId);
} else {
throw new JobNotFoundException("Job not found: " + jobId);
}
} catch (SchedulerException e) {
logger.error("Failed to pause job: {}", jobId, e);
throw new JobManagementException("Failed to pause job", e);
}
}
// Resume job
public void resumeJob(String jobId, String group) {
try {
JobKey jobKey = new JobKey(jobId, group);
if (scheduler.checkExists(jobKey)) {
scheduler.resumeJob(jobKey);
logger.info("Resumed job: {}", jobId);
} else {
throw new JobNotFoundException("Job not found: " + jobId);
}
} catch (SchedulerException e) {
logger.error("Failed to resume job: {}", jobId, e);
throw new JobManagementException("Failed to resume job", e);
}
}
// Delete job
public void deleteJob(String jobId, String group) {
try {
JobKey jobKey = new JobKey(jobId, group);
if (scheduler.checkExists(jobKey)) {
scheduler.deleteJob(jobKey);
logger.info("Deleted job: {}", jobId);
} else {
throw new JobNotFoundException("Job not found: " + jobId);
}
} catch (SchedulerException e) {
logger.error("Failed to delete job: {}", jobId, e);
throw new JobManagementException("Failed to delete job", e);
}
}
// Trigger job immediately
public void triggerJob(String jobId, String group) {
try {
JobKey jobKey = new JobKey(jobId, group);
if (scheduler.checkExists(jobKey)) {
scheduler.triggerJob(jobKey);
logger.info("Manually triggered job: {}", jobId);
} else {
throw new JobNotFoundException("Job not found: " + jobId);
}
} catch (SchedulerException e) {
logger.error("Failed to trigger job: {}", jobId, e);
throw new JobManagementException("Failed to trigger job", e);
}
}
// Get job status
public JobStatus getJobStatus(String jobId, String group) {
try {
JobKey jobKey = new JobKey(jobId, group);
if (!scheduler.checkExists(jobKey)) {
throw new JobNotFoundException("Job not found: " + jobId);
}
Trigger.TriggerState state = scheduler.getTriggerState(
new TriggerKey("trigger_" + jobId, group)
);
return JobStatus.fromTriggerState(state);
} catch (SchedulerException e) {
logger.error("Failed to get job status: {}", jobId, e);
throw new JobManagementException("Failed to get job status", e);
}
}
public enum JobStatus {
NORMAL, PAUSED, COMPLETE, ERROR, BLOCKED, NONE;
public static JobStatus fromTriggerState(Trigger.TriggerState state) {
return switch (state) {
case NORMAL -> NORMAL;
case PAUSED -> PAUSED;
case COMPLETE -> COMPLETE;
case ERROR -> ERROR;
case BLOCKED -> BLOCKED;
case NONE -> NONE;
};
}
}
// Custom Exceptions
public static class JobSchedulingException extends RuntimeException {
public JobSchedulingException(String message) { super(message); }
public JobSchedulingException(String message, Throwable cause) { super(message, cause); }
}
public static class JobManagementException extends RuntimeException {
public JobManagementException(String message) { super(message); }
public JobManagementException(String message, Throwable cause) { super(message, cause); }
}
public static class JobNotFoundException extends RuntimeException {
public JobNotFoundException(String message) { super(message); }
}
}

2. JobRunr - Modern Hangfire Alternative

JobRunr Dependencies

<properties>
<jobrunr.version>6.2.3</jobrunr.version>
</properties>
<dependencies>
<!-- JobRunr -->
<dependency>
<groupId>org.jobrunr</groupId>
<artifactId>jobrunr-spring-boot-starter</artifactId>
<version>${jobrunr.version}</version>
</dependency>
<!-- Storage (Redis example) -->
<dependency>
<groupId>org.jobrunr</groupId>
<artifactId>jobrunr-storage-redis</artifactId>
<version>${jobrunr.version}</version>
</dependency>
</dependencies>

JobRunr Configuration

# application.yml
org:
jobrunr:
background-job-server:
enabled: true
dashboard:
enabled: true
port: 8000
database:
type: redis
host: localhost
port: 6379
jobs:
default-number-of-retries: 3
retry-back-off-time-seed: 3
miscellaneous:
allow-anonymous-data-usage: false

JobRunr Service Implementation

package com.example.jobs.service;
import org.jobrunr.jobs.annotations.Job;
import org.jobrunr.scheduling.JobScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.UUID;
@Service
public class JobRunrService {
private static final Logger logger = LoggerFactory.getLogger(JobRunrService.class);
private final JobScheduler jobScheduler;
public JobRunrService(JobScheduler jobScheduler) {
this.jobScheduler = jobScheduler;
}
// Enqueue a simple background job
public String enqueueSimpleJob(String input) {
String jobId = jobScheduler.enqueue(() -> processSimpleJob(input));
logger.info("Enqueued simple job with ID: {}", jobId);
return jobId;
}
// Schedule job for later execution
public String scheduleJob(String input, LocalDateTime scheduleTime) {
String jobId = jobScheduler.schedule(scheduleTime, () -> processScheduledJob(input));
logger.info("Scheduled job with ID: {} for {}", jobId, scheduleTime);
return jobId;
}
// Schedule recurring job
public String scheduleRecurringJob(String jobName, String input, String cronExpression) {
String jobId = jobScheduler.scheduleRecurrently(jobName, cronExpression, () -> processRecurringJob(input));
logger.info("Scheduled recurring job: {} with cron: {}", jobName, cronExpression);
return jobId;
}
// Job methods with @Job annotation for automatic retry and monitoring
@Job(name = "Process simple job", retries = 3)
public void processSimpleJob(String input) {
logger.info("Processing simple job with input: {}", input);
// Simulate work
try {
Thread.sleep(2000);
// Business logic here
if ("error".equals(input)) {
throw new RuntimeException("Simulated error for testing retries");
}
logger.info("Simple job completed successfully: {}", input);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Job interrupted", e);
}
}
@Job(name = "Process scheduled job", retries = 2)
public void processScheduledJob(String input) {
logger.info("Processing scheduled job with input: {}", input);
// Business logic for scheduled job
}
@Job(name = "Process recurring job")
public void processRecurringJob(String input) {
logger.info("Processing recurring job with input: {}", input);
// Business logic for recurring job
}
// Complex job with progress reporting
@Job(name = "Process file upload", retries = 2)
public void processFileUpload(UUID jobId, String filePath, String fileType) {
logger.info("Starting file upload processing: {}", filePath);
try {
// Report progress
jobScheduler.getJobById(jobId).ifPresent(job -> {
jobScheduler.getJobProgress(job).updateProgress(10, "Starting processing");
});
// Step 1: Validate file
validateFile(filePath, fileType);
updateProgress(jobId, 30, "File validated");
// Step 2: Process file content
processFileContent(filePath);
updateProgress(jobId, 70, "File content processed");
// Step 3: Update database
updateDatabase(filePath);
updateProgress(jobId, 100, "Processing completed");
logger.info("File upload processing completed: {}", filePath);
} catch (Exception e) {
logger.error("File upload processing failed: {}", filePath, e);
throw new RuntimeException("File processing failed", e);
}
}
private void validateFile(String filePath, String fileType) {
// File validation logic
logger.info("Validating file: {} of type: {}", filePath, fileType);
}
private void processFileContent(String filePath) {
// File processing logic
logger.info("Processing file content: {}", filePath);
}
private void updateDatabase(String filePath) {
// Database update logic
logger.info("Updating database for file: {}", filePath);
}
private void updateProgress(UUID jobId, int progress, String message) {
jobScheduler.getJobById(jobId).ifPresent(job -> {
jobScheduler.getJobProgress(job).updateProgress(progress, message);
});
}
// Email job with parameters
public String enqueueEmailJob(String to, String subject, String body) {
String jobId = jobScheduler.enqueue(() -> sendEmail(to, subject, body));
logger.info("Enqueued email job with ID: {}", jobId);
return jobId;
}
@Job(name = "Send email", retries = 3)
public void sendEmail(String to, String subject, String body) {
logger.info("Sending email to: {} with subject: {}", to, subject);
try {
// Email sending logic
// emailService.send(to, subject, body);
logger.info("Email sent successfully to: {}", to);
} catch (Exception e) {
logger.error("Failed to send email to: {}", to, e);
throw new RuntimeException("Email sending failed", e);
}
}
}

3. Spring Batch - Batch Processing Framework

Spring Batch Configuration

package com.example.jobs.config;
import com.example.jobs.listener.JobCompletionListener;
import com.example.jobs.processor.DataItemProcessor;
import com.example.jobs.reader.DataItemReader;
import com.example.jobs.writer.DataItemWriter;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Bean
public Job dataProcessingJob(JobRepository jobRepository, Step processDataStep, 
JobCompletionListener listener) {
return new JobBuilder("dataProcessingJob", jobRepository)
.listener(listener)
.start(processDataStep)
.build();
}
@Bean
public Step processDataStep(JobRepository jobRepository, PlatformTransactionManager transactionManager,
ItemReader<String> reader, ItemProcessor<String, String> processor,
ItemWriter<String> writer) {
return new StepBuilder("processDataStep", jobRepository)
.<String, String>chunk(100, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public ItemReader<String> itemReader() {
return new DataItemReader();
}
@Bean
public ItemProcessor<String, String> itemProcessor() {
return new DataItemProcessor();
}
@Bean
public ItemWriter<String> itemWriter() {
return new DataItemWriter();
}
}

Spring Batch Components

package com.example.jobs.reader;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@Component
public class DataItemReader implements ItemReader<String> {
private Iterator<String> dataIterator;
public DataItemReader() {
// Simulate data source
List<String> data = Arrays.asList(
"data1", "data2", "data3", "data4", "data5",
"data6", "data7", "data8", "data9", "data10"
);
this.dataIterator = data.iterator();
}
@Override
public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (dataIterator != null && dataIterator.hasNext()) {
return dataIterator.next();
}
return null;
}
}
package com.example.jobs.processor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;
@Component
public class DataItemProcessor implements ItemProcessor<String, String> {
private static final Logger logger = LoggerFactory.getLogger(DataItemProcessor.class);
@Override
public String process(String item) throws Exception {
logger.info("Processing item: {}", item);
// Simulate processing
Thread.sleep(100);
// Transform data
String processedItem = item.toUpperCase() + "_PROCESSED";
logger.info("Processed item: {} -> {}", item, processedItem);
return processedItem;
}
}
package com.example.jobs.writer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class DataItemWriter implements ItemWriter<String> {
private static final Logger logger = LoggerFactory.getLogger(DataItemWriter.class);
@Override
public void write(List<? extends String> items) throws Exception {
logger.info("Writing {} items", items.size());
for (String item : items) {
logger.info("Writing item: {}", item);
// Simulate writing to database or file
Thread.sleep(50);
}
logger.info("Successfully wrote {} items", items.size());
}
}
package com.example.jobs.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.stereotype.Component;
@Component
public class JobCompletionListener implements JobExecutionListener {
private static final Logger logger = LoggerFactory.getLogger(JobCompletionListener.class);
@Override
public void beforeJob(JobExecution jobExecution) {
logger.info("Starting job: {}", jobExecution.getJobInstance().getJobName());
}
@Override
public void afterJob(JobExecution jobExecution) {
logger.info("Completed job: {} with status: {}", 
jobExecution.getJobInstance().getJobName(),
jobExecution.getStatus());
// Log job statistics
logger.info("Job statistics - Start: {}, End: {}, Duration: {}ms",
jobExecution.getStartTime(),
jobExecution.getEndTime(),
jobExecution.getEndTime().getTime() - jobExecution.getStartTime().getTime());
}
}

Batch Job Service

package com.example.jobs.service;
import org.springframework.batch.core.*;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@Service
public class BatchJobService {
private final JobLauncher jobLauncher;
private final Job dataProcessingJob;
public BatchJobService(JobLauncher jobLauncher, Job dataProcessingJob) {
this.jobLauncher = jobLauncher;
this.dataProcessingJob = dataProcessingJob;
}
public JobExecution runDataProcessingJob() {
try {
Map<String, JobParameter> parameters = new HashMap<>();
parameters.put("timestamp", new JobParameter(System.currentTimeMillis()));
JobParameters jobParameters = new JobParameters(parameters);
return jobLauncher.run(dataProcessingJob, jobParameters);
} catch (JobExecutionAlreadyRunningException | JobRestartException |
JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
throw new BatchJobException("Failed to run batch job", e);
}
}
public JobExecution runDataProcessingJobWithParameters(String inputFile, String outputFile) {
try {
Map<String, JobParameter> parameters = new HashMap<>();
parameters.put("inputFile", new JobParameter(inputFile));
parameters.put("outputFile", new JobParameter(outputFile));
parameters.put("timestamp", new JobParameter(System.currentTimeMillis()));
JobParameters jobParameters = new JobParameters(parameters);
return jobLauncher.run(dataProcessingJob, jobParameters);
} catch (JobExecutionAlreadyRunningException | JobRestartException |
JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
throw new BatchJobException("Failed to run batch job with parameters", e);
}
}
public static class BatchJobException extends RuntimeException {
public BatchJobException(String message) {
super(message);
}
public BatchJobException(String message, Throwable cause) {
super(message, cause);
}
}
}

4. Custom Implementation with Spring

Custom Job Framework

package com.example.jobs.framework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Component
public class CustomJobScheduler {
private static final Logger logger = LoggerFactory.getLogger(CustomJobScheduler.class);
private final Map<String, JobInfo> scheduledJobs = new ConcurrentHashMap<>();
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
// Schedule a one-time job
public String scheduleJob(Runnable job, long delay, TimeUnit timeUnit) {
String jobId = generateJobId();
ScheduledFuture<?> future = executorService.schedule(() -> {
try {
logger.info("Executing scheduled job: {}", jobId);
job.run();
scheduledJobs.get(jobId).setStatus(JobStatus.COMPLETED);
logger.info("Completed scheduled job: {}", jobId);
} catch (Exception e) {
logger.error("Scheduled job failed: {}", jobId, e);
scheduledJobs.get(jobId).setStatus(JobStatus.FAILED);
scheduledJobs.get(jobId).setError(e.getMessage());
}
}, delay, timeUnit);
JobInfo jobInfo = new JobInfo(jobId, job, JobStatus.SCHEDULED, future);
scheduledJobs.put(jobId, jobInfo);
logger.info("Scheduled job: {} with delay: {} {}", jobId, delay, timeUnit);
return jobId;
}
// Schedule recurring job
public String scheduleRecurringJob(Runnable job, long initialDelay, long period, TimeUnit timeUnit) {
String jobId = generateJobId();
ScheduledFuture<?> future = executorService.scheduleAtFixedRate(() -> {
try {
logger.info("Executing recurring job: {}", jobId);
job.run();
scheduledJobs.get(jobId).setLastExecution(LocalDateTime.now());
logger.info("Completed recurring job execution: {}", jobId);
} catch (Exception e) {
logger.error("Recurring job failed: {}", jobId, e);
scheduledJobs.get(jobId).setLastError(e.getMessage());
}
}, initialDelay, period, timeUnit);
JobInfo jobInfo = new JobInfo(jobId, job, JobStatus.RECURRING, future);
scheduledJobs.put(jobId, jobInfo);
logger.info("Scheduled recurring job: {} with period: {} {}", jobId, period, timeUnit);
return jobId;
}
// Cancel a scheduled job
public boolean cancelJob(String jobId) {
JobInfo jobInfo = scheduledJobs.get(jobId);
if (jobInfo != null && jobInfo.getFuture() != null) {
boolean cancelled = jobInfo.getFuture().cancel(false);
if (cancelled) {
jobInfo.setStatus(JobStatus.CANCELLED);
logger.info("Cancelled job: {}", jobId);
}
return cancelled;
}
return false;
}
// Get job status
public JobInfo getJobStatus(String jobId) {
return scheduledJobs.get(jobId);
}
// Execute job immediately (fire and forget)
@Async
public CompletableFuture<String> executeAsync(Runnable job) {
return CompletableFuture.supplyAsync(() -> {
String jobId = generateJobId();
JobInfo jobInfo = new JobInfo(jobId, job, JobStatus.RUNNING, null);
scheduledJobs.put(jobId, jobInfo);
try {
logger.info("Starting async job: {}", jobId);
job.run();
jobInfo.setStatus(JobStatus.COMPLETED);
logger.info("Completed async job: {}", jobId);
return jobId;
} catch (Exception e) {
logger.error("Async job failed: {}", jobId, e);
jobInfo.setStatus(JobStatus.FAILED);
jobInfo.setError(e.getMessage());
throw new RuntimeException("Async job failed", e);
}
});
}
// Built-in scheduled tasks using Spring's @Scheduled
@Scheduled(fixedRate = 300000) // Every 5 minutes
public void cleanupCompletedJobs() {
logger.info("Running job cleanup");
scheduledJobs.entrySet().removeIf(entry -> {
JobInfo jobInfo = entry.getValue();
boolean shouldRemove = jobInfo.getStatus() == JobStatus.COMPLETED || 
jobInfo.getStatus() == JobStatus.FAILED ||
jobInfo.getStatus() == JobStatus.CANCELLED;
if (shouldRemove) {
logger.debug("Removing completed job: {}", entry.getKey());
}
return shouldRemove;
});
}
@Scheduled(cron = "0 0 2 * * ?") // Daily at 2 AM
public void nightlyMaintenanceJob() {
logger.info("Starting nightly maintenance job");
// Perform maintenance tasks
logger.info("Completed nightly maintenance job");
}
private String generateJobId() {
return "JOB_" + System.currentTimeMillis() + "_" + 
ThreadLocalRandom.current().nextInt(1000, 9999);
}
public enum JobStatus {
SCHEDULED, RUNNING, COMPLETED, FAILED, CANCELLED, RECURRING
}
public static class JobInfo {
private final String jobId;
private final Runnable job;
private JobStatus status;
private final ScheduledFuture<?> future;
private LocalDateTime lastExecution;
private String error;
public JobInfo(String jobId, Runnable job, JobStatus status, ScheduledFuture<?> future) {
this.jobId = jobId;
this.job = job;
this.status = status;
this.future = future;
this.lastExecution = LocalDateTime.now();
}
// Getters and setters
public String getJobId() { return jobId; }
public Runnable getJob() { return job; }
public JobStatus getStatus() { return status; }
public void setStatus(JobStatus status) { this.status = status; }
public ScheduledFuture<?> getFuture() { return future; }
public LocalDateTime getLastExecution() { return lastExecution; }
public void setLastExecution(LocalDateTime lastExecution) { this.lastExecution = lastExecution; }
public String getError() { return error; }
public void setError(String error) { this.error = error; }
}
}

5. Distributed Job Processing

Redis-based Distributed Job Queue

package com.example.jobs.distributed;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Service
public class DistributedJobService {
private static final Logger logger = LoggerFactory.getLogger(DistributedJobService.class);
private final RedisTemplate<String, String> redisTemplate;
private final ObjectMapper objectMapper;
private final ScheduledExecutorService workerPool;
private static final String JOB_QUEUE = "jobs:queue";
private static final String PROCESSING_QUEUE = "jobs:processing";
private static final String JOB_PREFIX = "job:";
private static final String LOCK_PREFIX = "lock:";
public DistributedJobService(RedisTemplate<String, String> redisTemplate, ObjectMapper objectMapper) {
this.redisTemplate = redisTemplate;
this.objectMapper = objectMapper;
this.workerPool = Executors.newScheduledThreadPool(5);
startWorkers();
}
// Enqueue a job
public String enqueueJob(JobRequest jobRequest) {
String jobId = UUID.randomUUID().toString();
Job job = new Job(jobId, jobRequest.getType(), jobRequest.getPayload(), JobStatus.PENDING);
try {
String jobJson = objectMapper.writeValueAsString(job);
// Store job details
redisTemplate.opsForValue().set(JOB_PREFIX + jobId, jobJson, Duration.ofHours(24));
// Add to queue
redisTemplate.opsForList().leftPush(JOB_QUEUE, jobId);
logger.info("Enqueued job: {} of type: {}", jobId, jobRequest.getType());
return jobId;
} catch (Exception e) {
logger.error("Failed to enqueue job", e);
throw new JobQueueException("Failed to enqueue job", e);
}
}
// Schedule delayed job
public String scheduleJob(JobRequest jobRequest, long delay, TimeUnit timeUnit) {
String jobId = UUID.randomUUID().toString();
Job job = new Job(jobId, jobRequest.getType(), jobRequest.getPayload(), JobStatus.SCHEDULED);
try {
String jobJson = objectMapper.writeValueAsString(job);
// Store job details
redisTemplate.opsForValue().set(JOB_PREFIX + jobId, jobJson, Duration.ofHours(24));
// Schedule using Redis sorted set with score as execution time
long executionTime = System.currentTimeMillis() + timeUnit.toMillis(delay);
redisTemplate.opsForZSet().add("jobs:scheduled", jobId, executionTime);
logger.info("Scheduled job: {} for execution in {} {}", jobId, delay, timeUnit);
return jobId;
} catch (Exception e) {
logger.error("Failed to schedule job", e);
throw new JobQueueException("Failed to schedule job", e);
}
}
// Get job status
public Job getJobStatus(String jobId) {
try {
String jobJson = redisTemplate.opsForValue().get(JOB_PREFIX + jobId);
if (jobJson != null) {
return objectMapper.readValue(jobJson, Job.class);
}
return null;
} catch (Exception e) {
logger.error("Failed to get job status: {}", jobId, e);
throw new JobQueueException("Failed to get job status", e);
}
}
private void startWorkers() {
for (int i = 0; i < 5; i++) {
workerPool.scheduleWithFixedDelay(this::processJobs, 0, 1, TimeUnit.SECONDS);
}
// Start scheduled job processor
workerPool.scheduleWithFixedDelay(this::processScheduledJobs, 0, 1, TimeUnit.SECONDS);
// Start stuck job recovery
workerPool.scheduleWithFixedDelay(this::recoverStuckJobs, 0, 30, TimeUnit.SECONDS);
}
private void processJobs() {
try {
// Use BRPOPLPUSH for reliable queue processing
String jobId = redisTemplate.opsForList().rightPopAndLeftPush(JOB_QUEUE, PROCESSING_QUEUE, 5, TimeUnit.SECONDS);
if (jobId != null) {
processJob(jobId);
}
} catch (Exception e) {
logger.error("Error in job processor", e);
}
}
private void processJob(String jobId) {
String lockKey = LOCK_PREFIX + jobId;
// Try to acquire lock
Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "locked", Duration.ofSeconds(30));
if (Boolean.TRUE.equals(locked)) {
try {
// Get job details
Job job = getJobStatus(jobId);
if (job != null) {
job.setStatus(JobStatus.PROCESSING);
updateJob(job);
logger.info("Processing job: {} of type: {}", jobId, job.getType());
// Execute job based on type
executeJob(job);
job.setStatus(JobStatus.COMPLETED);
updateJob(job);
// Remove from processing queue
redisTemplate.opsForList().remove(PROCESSING_QUEUE, 1, jobId);
logger.info("Completed job: {}", jobId);
}
} catch (Exception e) {
logger.error("Failed to process job: {}", jobId, e);
// Mark job as failed
Job job = getJobStatus(jobId);
if (job != null) {
job.setStatus(JobStatus.FAILED);
job.setError(e.getMessage());
updateJob(job);
}
} finally {
// Release lock
redisTemplate.delete(lockKey);
}
}
}
private void processScheduledJobs() {
try {
long now = System.currentTimeMillis();
// Get jobs scheduled for execution
List<String> jobIds = redisTemplate.opsForZSet().rangeByScore("jobs:scheduled", 0, now);
if (jobIds != null && !jobIds.isEmpty()) {
for (String jobId : jobIds) {
// Move to main queue
redisTemplate.opsForList().leftPush(JOB_QUEUE, jobId);
redisTemplate.opsForZSet().remove("jobs:scheduled", jobId);
logger.info("Moved scheduled job to queue: {}", jobId);
}
}
} catch (Exception e) {
logger.error("Error in scheduled job processor", e);
}
}
private void recoverStuckJobs() {
try {
// Move jobs stuck in processing queue back to main queue
long processingTimeout = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5);
String luaScript = """
local processingQueue = KEYS[1]
local mainQueue = KEYS[2]
local jobPrefix = KEYS[3]
local lockPrefix = KEYS[4]
local timeout = ARGV[1]
local jobIds = redis.call('LRANGE', processingQueue, 0, -1)
local recovered = 0
for i, jobId in ipairs(jobIds) do
local lockKey = lockPrefix .. jobId
local jobKey = jobPrefix .. jobId
-- Check if lock exists and is stale
local lockTime = redis.call('TTL', lockKey)
if lockTime == -2 then
-- No lock, check job last update
local jobData = redis.call('GET', jobKey)
if jobData then
local job = cjson.decode(jobData)
if job.lastUpdate < timeout then
-- Move back to main queue
redis.call('LPUSH', mainQueue, jobId)
redis.call('LREM', processingQueue, 1, jobId)
recovered = recovered + 1
end
end
end
end
return recovered
""";
DefaultRedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class);
Long recovered = redisTemplate.execute(script, 
List.of(PROCESSING_QUEUE, JOB_QUEUE, JOB_PREFIX, LOCK_PREFIX),
String.valueOf(processingTimeout));
if (recovered != null && recovered > 0) {
logger.info("Recovered {} stuck jobs", recovered);
}
} catch (Exception e) {
logger.error("Error in stuck job recovery", e);
}
}
private void executeJob(Job job) {
// Implement job execution logic based on job type
switch (job.getType()) {
case "EMAIL":
sendEmail(job);
break;
case "DATA_PROCESSING":
processData(job);
break;
case "REPORT_GENERATION":
generateReport(job);
break;
default:
throw new IllegalArgumentException("Unknown job type: " + job.getType());
}
}
private void sendEmail(Job job) {
// Email sending logic
logger.info("Sending email from job: {}", job.getId());
}
private void processData(Job job) {
// Data processing logic
logger.info("Processing data from job: {}", job.getId());
}
private void generateReport(Job job) {
// Report generation logic
logger.info("Generating report from job: {}", job.getId());
}
private void updateJob(Job job) {
try {
job.setLastUpdate(System.currentTimeMillis());
String jobJson = objectMapper.writeValueAsString(job);
redisTemplate.opsForValue().set(JOB_PREFIX + job.getId(), jobJson, Duration.ofHours(24));
} catch (Exception e) {
logger.error("Failed to update job: {}", job.getId(), e);
}
}
// Data models
public static class JobRequest {
private String type;
private Object payload;
// Getters and setters
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public Object getPayload() { return payload; }
public void setPayload(Object payload) { this.payload = payload; }
}
public static class Job {
private String id;
private String type;
private Object payload;
private JobStatus status;
private String error;
private long lastUpdate;
public Job() {}
public Job(String id, String type, Object payload, JobStatus status) {
this.id = id;
this.type = type;
this.payload = payload;
this.status = status;
this.lastUpdate = System.currentTimeMillis();
}
// Getters and setters
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public Object getPayload() { return payload; }
public void setPayload(Object payload) { this.payload = payload; }
public JobStatus getStatus() { return status; }
public void setStatus(JobStatus status) { this.status = status; }
public String getError() { return error; }
public void setError(String error) { this.error = error; }
public long getLastUpdate() { return lastUpdate; }
public void setLastUpdate(long lastUpdate) { this.lastUpdate = lastUpdate; }
}
public enum JobStatus {
PENDING, SCHEDULED, PROCESSING, COMPLETED, FAILED
}
public static class JobQueueException extends RuntimeException {
public JobQueueException(String message) { super(message); }
public JobQueueException(String message, Throwable cause) { super(message, cause); }
}
}

6. REST Controllers

Job Management Controller

package com.example.jobs.web;
import com.example.jobs.service.QuartzJobService;
import com.example.jobs.service.JobRunrService;
import com.example.jobs.service.BatchJobService;
import com.example.jobs.distributed.DistributedJobService;
import com.example.jobs.framework.CustomJobScheduler;
import org.springframework.batch.core.JobExecution;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.time.LocalDateTime;
import java.util.Map;
@RestController
@RequestMapping("/api/jobs")
public class JobController {
private final QuartzJobService quartzJobService;
private final JobRunrService jobRunrService;
private final BatchJobService batchJobService;
private final DistributedJobService distributedJobService;
private final CustomJobScheduler customJobScheduler;
public JobController(QuartzJobService quartzJobService, JobRunrService jobRunrService,
BatchJobService batchJobService, DistributedJobService distributedJobService,
CustomJobScheduler customJobScheduler) {
this.quartzJobService = quartzJobService;
this.jobRunrService = jobRunrService;
this.batchJobService = batchJobService;
this.distributedJobService = distributedJobService;
this.customJobScheduler = customJobScheduler;
}
// Quartz endpoints
@PostMapping("/quartz/schedule")
public ResponseEntity<Map<String, String>> scheduleQuartzJob(@RequestBody ScheduleQuartzRequest request) {
String jobId = quartzJobService.scheduleOneTimeJob(
request.jobClass(),
request.jobDataMap(),
request.scheduleTime()
);
return ResponseEntity.ok(Map.of("jobId", jobId, "status", "scheduled"));
}
@PostMapping("/quartz/recurring")
public ResponseEntity<Map<String, String>> scheduleRecurringQuartzJob(@RequestBody ScheduleRecurringRequest request) {
String jobId = quartzJobService.scheduleRecurringJob(
request.jobClass(),
request.jobDataMap(),
request.cronExpression()
);
return ResponseEntity.ok(Map.of("jobId", jobId, "status", "scheduled"));
}
@PostMapping("/quartz/{jobId}/trigger")
public ResponseEntity<String> triggerQuartzJob(@PathVariable String jobId, @RequestParam String group) {
quartzJobService.triggerJob(jobId, group);
return ResponseEntity.ok("Job triggered successfully");
}
// JobRunr endpoints
@PostMapping("/jobrunr/enqueue")
public ResponseEntity<Map<String, String>> enqueueJobRunrJob(@RequestBody JobRunrRequest request) {
String jobId = jobRunrService.enqueueSimpleJob(request.input());
return ResponseEntity.ok(Map.of("jobId", jobId, "status", "enqueued"));
}
@PostMapping("/jobrunr/schedule")
public ResponseEntity<Map<String, String>> scheduleJobRunrJob(@RequestBody JobRunrScheduleRequest request) {
String jobId = jobRunrService.scheduleJob(request.input(), request.scheduleTime());
return ResponseEntity.ok(Map.of("jobId", jobId, "status", "scheduled"));
}
// Batch endpoints
@PostMapping("/batch/run")
public ResponseEntity<Map<String, Object>> runBatchJob() {
JobExecution execution = batchJobService.runDataProcessingJob();
return ResponseEntity.ok(Map.of(
"executionId", execution.getId(),
"status", execution.getStatus().toString(),
"startTime", execution.getStartTime()
));
}
// Distributed job endpoints
@PostMapping("/distributed/enqueue")
public ResponseEntity<Map<String, String>> enqueueDistributedJob(@RequestBody DistributedJobRequest request) {
String jobId = distributedJobService.enqueueJob(request);
return ResponseEntity.ok(Map.of("jobId", jobId, "status", "enqueued"));
}
@GetMapping("/distributed/{jobId}/status")
public ResponseEntity<DistributedJobService.Job> getDistributedJobStatus(@PathVariable String jobId) {
DistributedJobService.Job job = distributedJobService.getJobStatus(jobId);
return ResponseEntity.ok(job);
}
// Custom scheduler endpoints
@PostMapping("/custom/schedule")
public ResponseEntity<Map<String, String>> scheduleCustomJob(@RequestBody CustomJobRequest request) {
String jobId = customJobScheduler.scheduleJob(
request.task(),
request.delay(),
request.timeUnit()
);
return ResponseEntity.ok(Map.of("jobId", jobId, "status", "scheduled"));
}
@PostMapping("/custom/async")
public ResponseEntity<Map<String, String>> executeAsyncJob(@RequestBody CustomJobRequest request) {
var future = customJobScheduler.executeAsync(request.task());
return ResponseEntity.ok(Map.of("status", "async_job_started"));
}
// Request DTOs
public record ScheduleQuartzRequest(Class jobClass, org.quartz.JobDataMap jobDataMap, LocalDateTime scheduleTime) {}
public record ScheduleRecurringRequest(Class jobClass, org.quartz.JobDataMap jobDataMap, String cronExpression) {}
public record JobRunrRequest(String input) {}
public record JobRunrScheduleRequest(String input, LocalDateTime scheduleTime) {}
public record DistributedJobRequest(String type, Object payload) {}
public record CustomJobRequest(Runnable task, Long delay, java.util.concurrent.TimeUnit timeUnit) {}
}

7. Monitoring and Management

Job Monitoring Service

package com.example.jobs.monitoring;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@Service
public class JobMonitoringService {
private static final Logger logger = LoggerFactory.getLogger(JobMonitoringService.class);
private final MeterRegistry meterRegistry;
private final ConcurrentHashMap<String, AtomicLong> jobCounters;
private final Timer jobExecutionTimer;
public JobMonitoringService(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.jobCounters = new ConcurrentHashMap<>();
this.jobExecutionTimer = Timer.builder("job.execution.time")
.description("Time taken to execute jobs")
.register(meterRegistry);
initializeMetrics();
}
private void initializeMetrics() {
// Initialize common metrics
Counter.builder("job.total")
.description("Total number of jobs")
.register(meterRegistry);
Counter.builder("job.success")
.description("Number of successful jobs")
.register(meterRegistry);
Counter.builder("job.failure")
.description("Number of failed jobs")
.register(meterRegistry);
}
public void recordJobStart(String jobType, String jobId) {
logger.info("Job started - Type: {}, ID: {}", jobType, jobId);
// Increment job counter
getJobCounter(jobType, "started").increment();
// Record start time
JobContext.setStartTime(jobId);
}
public void recordJobSuccess(String jobType, String jobId) {
long duration = recordJobCompletion(jobType, jobId);
logger.info("Job completed successfully - Type: {}, ID: {}, Duration: {}ms", jobType, jobId, duration);
// Update metrics
getJobCounter(jobType, "success").increment();
jobExecutionTimer.record(duration, TimeUnit.MILLISECONDS);
}
public void recordJobFailure(String jobType, String jobId, String error) {
long duration = recordJobCompletion(jobType, jobId);
logger.error("Job failed - Type: {}, ID: {}, Duration: {}ms, Error: {}", jobType, jobId, duration, error);
// Update metrics
getJobCounter(jobType, "failure").increment();
getJobCounter(jobType, "error").increment();
// Record error details
Counter.builder("job.error.details")
.tag("jobType", jobType)
.tag("error", error)
.register(meterRegistry)
.increment();
}
public void recordJobRetry(String jobType, String jobId, int retryCount) {
logger.warn("Job retry - Type: {}, ID: {}, Retry: {}", jobType, jobId, retryCount);
getJobCounter(jobType, "retry").increment();
Counter.builder("job.retry.count")
.tag("jobType", jobType)
.tag("retryCount", String.valueOf(retryCount))
.register(meterRegistry)
.increment();
}
private long recordJobCompletion(String jobType, String jobId) {
Long startTime = JobContext.getStartTime(jobId);
long duration = 0;
if (startTime != null) {
duration = System.currentTimeMillis() - startTime;
JobContext.clearStartTime(jobId);
}
getJobCounter(jobType, "completed").increment();
return duration;
}
private AtomicLong getJobCounter(String jobType, String status) {
String key = String.format("job.%s.%s", jobType, status);
return jobCounters.computeIfAbsent(key, k -> {
return meterRegistry.gauge(key, new AtomicLong(0));
});
}
public JobMetrics getJobMetrics(String jobType) {
long started = getJobCounter(jobType, "started").get();
long success = getJobCounter(jobType, "success").get();
long failure = getJobCounter(jobType, "failure").get();
long completed = getJobCounter(jobType, "completed").get();
double successRate = started > 0 ? (double) success / started * 100 : 0;
return new JobMetrics(jobType, started, success, failure, completed, successRate);
}
// Context for tracking job execution
private static class JobContext {
private static final ThreadLocal<ConcurrentHashMap<String, Long>> startTimes = 
ThreadLocal.withInitial(ConcurrentHashMap::new);
public static void setStartTime(String jobId) {
startTimes.get().put(jobId, System.currentTimeMillis());
}
public static Long getStartTime(String jobId) {
return startTimes.get().get(jobId);
}
public static void clearStartTime(String jobId) {
startTimes.get().remove(jobId);
}
}
public record JobMetrics(String jobType, long totalStarted, long totalSuccess, 
long totalFailure, long totalCompleted, double successRate) {}
}

8. Spring Boot Application

Main Application Class

package com.example.jobs;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableAsync
@EnableScheduling
public class JobProcessingApplication {
public static void main(String[] args) {
SpringApplication.run(JobProcessingApplication.class, args);
}
}

Summary

This comprehensive Java background job processing solution provides multiple alternatives to Hangfire:

Key Solutions:

  1. Quartz Scheduler: Enterprise-grade scheduling with persistence and clustering
  2. JobRunr: Modern Hangfire-like alternative with dashboard and progress tracking
  3. Spring Batch: Robust batch processing for large datasets
  4. Custom Implementation: Flexible custom job scheduler
  5. Distributed Processing: Redis-based distributed job queue

Features:

  • Job scheduling (one-time, recurring, delayed)
  • Job monitoring and metrics
  • Retry mechanisms and error handling
  • Distributed processing capabilities
  • REST API for job management
  • Comprehensive monitoring and dashboards

Use Cases:

  • Email sending and notifications
  • Data processing and ETL jobs
  • Report generation
  • System maintenance tasks
  • Real-time data synchronization
  • Batch processing operations

This implementation provides production-ready background job processing capabilities that rival or exceed Hangfire's features in the Java ecosystem.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper