Introduction
Quartz Scheduler is a powerful, enterprise-grade job scheduling library that enables complex scheduling requirements. This guide covers advanced features including clustering, persistence, misfire handling, and custom plugins.
Advanced Configuration
Cluster Configuration with JDBC JobStore
@Configuration
public class QuartzClusterConfig {
@Bean
public SchedulerFactoryBean clusterScheduler(DataSource dataSource) {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
Properties props = new Properties();
// Cluster Configuration
props.setProperty("org.quartz.scheduler.instanceName", "ClusterScheduler");
props.setProperty("org.quartz.scheduler.instanceId", "AUTO");
props.setProperty("org.quartz.scheduler.rmi.export", "false");
props.setProperty("org.quartz.scheduler.rmi.proxy", "false");
// Thread Pool Configuration
props.setProperty("org.quartz.threadPool.class",
"org.quartz.simpl.SimpleThreadPool");
props.setProperty("org.quartz.threadPool.threadCount", "10");
props.setProperty("org.quartz.threadPool.threadPriority", "5");
// JobStore Configuration
props.setProperty("org.quartz.jobStore.class",
"org.quartz.impl.jdbcjobstore.JobStoreTX");
props.setProperty("org.quartz.jobStore.driverDelegateClass",
"org.quartz.impl.jdbcjobstore.StdJDBCDelegate");
props.setProperty("org.quartz.jobStore.tablePrefix", "QRTZ_");
props.setProperty("org.quartz.jobStore.isClustered", "true");
props.setProperty("org.quartz.jobStore.clusterCheckinInterval", "20000");
props.setProperty("org.quartz.jobStore.useProperties", "true");
props.setProperty("org.quartz.jobStore.misfireThreshold", "60000");
props.setProperty("org.quartz.jobStore.maxMisfiresToHandleAtATime", "1");
// DataSource Configuration
props.setProperty("org.quartz.jobStore.dataSource", "quartzDataSource");
factory.setQuartzProperties(props);
factory.setDataSource(dataSource);
factory.setAutoStartup(true);
factory.setWaitForJobsToCompleteOnShutdown(true);
factory.setOverwriteExistingJobs(true);
// Custom Job Listeners
factory.setGlobalJobListeners(new CustomGlobalJobListener());
return factory;
}
@Bean
public DataSource quartzDataSource() {
HikariDataSource dataSource = new HikariDataSource();
dataSource.setDriverClassName("org.postgresql.Driver");
dataSource.setJdbcUrl("jdbc:postgresql://localhost:5432/quartz");
dataSource.setUsername("quartz");
dataSource.setPassword("quartz");
dataSource.setMaximumPoolSize(10);
dataSource.setMinimumIdle(2);
return dataSource;
}
}
Advanced Job Store Configuration
@Component
public class AdvancedQuartzConfig {
public Properties createRAMJobStoreConfig() {
Properties props = new Properties();
// RAMJobStore for non-persistent jobs
props.setProperty("org.quartz.jobStore.class",
"org.quartz.simpl.RAMJobStore");
props.setProperty("org.quartz.jobStore.misfireThreshold", "60000");
return props;
}
public Properties createTerracottaJobStoreConfig() {
Properties props = new Properties();
// Terracotta JobStore for high-performance clustering
props.setProperty("org.quartz.jobStore.class",
"org.terracotta.quartz.TerracottaJobStore");
props.setProperty("org.quartz.jobStore.tcConfigUrl", "localhost:9510");
return props;
}
public Scheduler createCustomScheduler() throws SchedulerException {
// Programmatic scheduler creation
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(createRAMJobStoreConfig());
Scheduler scheduler = factory.getScheduler();
// Add custom plugins
scheduler.getListenerManager().addJobListener(new CustomJobListener());
scheduler.getListenerManager().addTriggerListener(new CustomTriggerListener());
scheduler.getListenerManager().addSchedulerListener(new CustomSchedulerListener());
return scheduler;
}
}
Advanced Job Types
Stateful Jobs with Data Persistence
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class StatefulDataProcessingJob implements Job {
private static final Logger logger = LoggerFactory.getLogger(StatefulDataProcessingJob.class);
public static final String PROCESSED_COUNT = "processedCount";
public static final String LAST_PROCESSED_ID = "lastProcessedId";
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
int processedCount = jobDataMap.getInt(PROCESSED_COUNT);
Long lastProcessedId = jobDataMap.getLong(LAST_PROCESSED_ID);
try {
// Process data starting from last processed ID
List<DataRecord> records = fetchRecordsAfter(lastProcessedId);
for (DataRecord record : records) {
processRecord(record);
lastProcessedId = Math.max(lastProcessedId, record.getId());
processedCount++;
}
// Update job data for next execution
jobDataMap.put(PROCESSED_COUNT, processedCount);
jobDataMap.put(LAST_PROCESSED_ID, lastProcessedId);
logger.info("Processed {} records. Total: {}", records.size(), processedCount);
} catch (Exception e) {
logger.error("Error processing data", e);
handleFailure(context, e);
}
}
private List<DataRecord> fetchRecordsAfter(Long lastProcessedId) {
// Implementation to fetch records from database
return Collections.emptyList();
}
private void processRecord(DataRecord record) {
// Process individual record
}
private void handleFailure(JobExecutionContext context, Exception e) {
// Implement retry logic or fail job
if (shouldRetry(context)) {
JobExecutionException jobEx = new JobExecutionException(e);
jobEx.setRefireImmediately(true);
throw jobEx;
} else {
throw new JobExecutionException("Permanent failure", e, false);
}
}
private boolean shouldRetry(JobExecutionContext context) {
return context.getRefireCount() < 3;
}
}
Chained Jobs with Dependencies
@Component
public class JobChainingService {
private final Scheduler scheduler;
public JobChainingService(Scheduler scheduler) {
this.scheduler = scheduler;
}
public void scheduleJobChain(String chainName, List<JobDefinition> jobs)
throws SchedulerException {
JobDetail previousJob = null;
for (int i = 0; i < jobs.size(); i++) {
JobDefinition jobDef = jobs.get(i);
JobDetail job = createJobDetail(jobDef, chainName, i);
Trigger trigger = createTrigger(jobDef, chainName, i, previousJob);
scheduler.scheduleJob(job, trigger);
previousJob = job;
}
}
private JobDetail createJobDetail(JobDefinition jobDef, String chainName, int index) {
return JobBuilder.newJob(jobDef.getJobClass())
.withIdentity(chainName + "_job_" + index, "job_chains")
.usingJobData(jobDef.getJobData())
.storeDurably()
.build();
}
private Trigger createTrigger(JobDefinition jobDef, String chainName,
int index, JobDetail previousJob) {
TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger()
.withIdentity(chainName + "_trigger_" + index, "job_chains");
if (index == 0) {
// First job in chain
triggerBuilder.startNow();
} else {
// Subsequent jobs triggered by completion of previous job
triggerBuilder.startAt(DateBuilder.futureDate(10, DateBuilder.IntervalUnit.SECOND))
.modifiedByCalendar("chain_calendar");
}
return triggerBuilder.build();
}
public static class JobDefinition {
private final Class<? extends Job> jobClass;
private final JobDataMap jobData;
public JobDefinition(Class<? extends Job> jobClass, JobDataMap jobData) {
this.jobClass = jobClass;
this.jobData = jobData;
}
// Getters
public Class<? extends Job> getJobClass() { return jobClass; }
public JobDataMap getJobData() { return jobData; }
}
}
Custom Triggers and Calendars
Custom Business Day Trigger
public class BusinessDayTrigger implements Trigger {
private static final Logger logger = LoggerFactory.getLogger(BusinessDayTrigger.class);
private final TriggerKey triggerKey;
private final Date startTime;
private final Date endTime;
private final int intervalMinutes;
private final BusinessCalendar businessCalendar;
private Date nextFireTime;
public BusinessDayTrigger(String triggerName, String group,
Date startTime, Date endTime,
int intervalMinutes, BusinessCalendar businessCalendar) {
this.triggerKey = new TriggerKey(triggerName, group);
this.startTime = startTime;
this.endTime = endTime;
this.intervalMinutes = intervalMinutes;
this.businessCalendar = businessCalendar;
this.nextFireTime = startTime;
}
@Override
public Date getNextFireTime() {
return nextFireTime;
}
@Override
public Date getPreviousFireTime() {
// Implementation for previous fire time
return null;
}
@Override
public Date getFireTimeAfter(Date afterTime) {
if (afterTime == null) {
afterTime = new Date();
}
Calendar calendar = Calendar.getInstance();
calendar.setTime(afterTime);
while (true) {
// Add interval
calendar.add(Calendar.MINUTE, intervalMinutes);
Date candidate = calendar.getTime();
// Check if within business hours and business day
if (candidate.after(endTime)) {
// Move to next business day
candidate = businessCalendar.getNextBusinessDay(candidate);
calendar.setTime(candidate);
calendar.set(Calendar.HOUR_OF_DAY, 9); // 9 AM
calendar.set(Calendar.MINUTE, 0);
candidate = calendar.getTime();
}
if (businessCalendar.isBusinessDay(candidate) &&
businessCalendar.isBusinessHours(candidate)) {
return candidate;
}
// Prevent infinite loop
if (calendar.getTime().after(DateBuilder.nextGivenSecondDate(new Date(), 365 * 24 * 60 * 60))) {
return null;
}
}
}
@Override
public Date computeFirstFireTime(Calendar calendar) {
return getFireTimeAfter(startTime);
}
// Other required Trigger interface methods...
@Override
public TriggerKey getKey() { return triggerKey; }
@Override
public JobKey getJobKey() {
// Return associated job key
return new JobKey("businessJob", "businessGroup");
}
@Override
public String getDescription() {
return "Business Day Trigger: " + triggerKey.toString();
}
@Override
public Calendar getCalendar() { return null; }
@Override
public JobDataMap getJobDataMap() { return new JobDataMap(); }
@Override
public int getPriority() { return Trigger.DEFAULT_PRIORITY; }
@Override
public boolean mayFireAgain() { return nextFireTime != null; }
@Override
public TriggerBuilder<? extends Trigger> getTriggerBuilder() {
return TriggerBuilder.newTrigger();
}
@Override
public ScheduleBuilder<? extends Trigger> getScheduleBuilder() {
return null; // Custom triggers may not use standard schedule builders
}
@Override
public int compareTo(Trigger other) {
return getNextFireTime().compareTo(other.getNextFireTime());
}
}
@Component
public class BusinessCalendar implements Calendar {
private final Set<Integer> weekends = Set.of(Calendar.SATURDAY, Calendar.SUNDAY);
private final Set<Date> holidays = new HashSet<>();
public BusinessCalendar() {
// Initialize holidays
initializeHolidays();
}
@Override
public boolean isTimeIncluded(long timeStamp) {
Date date = new Date(timeStamp);
return isBusinessDay(date) && isBusinessHours(date);
}
@Override
public long getNextIncludedTime(long timeStamp) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(new Date(timeStamp));
while (true) {
calendar.add(Calendar.MINUTE, 1);
if (isTimeIncluded(calendar.getTimeInMillis())) {
return calendar.getTimeInMillis();
}
}
}
public boolean isBusinessDay(Date date) {
Calendar cal = Calendar.getInstance();
cal.setTime(date);
int dayOfWeek = cal.get(Calendar.DAY_OF_WEEK);
return !weekends.contains(dayOfWeek) && !holidays.contains(date);
}
public boolean isBusinessHours(Date date) {
Calendar cal = Calendar.getInstance();
cal.setTime(date);
int hour = cal.get(Calendar.HOUR_OF_DAY);
return hour >= 9 && hour < 17; // 9 AM to 5 PM
}
public Date getNextBusinessDay(Date date) {
Calendar cal = Calendar.getInstance();
cal.setTime(date);
do {
cal.add(Calendar.DAY_OF_MONTH, 1);
} while (!isBusinessDay(cal.getTime()));
return cal.getTime();
}
private void initializeHolidays() {
// Add holiday dates
// This would typically load from a database or configuration
}
@Override
public String getDescription() {
return "Business Calendar (Mon-Fri, 9AM-5PM, excluding holidays)";
}
@Override
public Calendar getBaseCalendar() {
return null;
}
@Override
public void setBaseCalendar(Calendar baseCalendar) {
// Not implemented
}
}
Advanced Misfire Handling
Custom Misfire Instructions
public class SmartMisfireHandlingJob implements Job {
private static final Logger logger = LoggerFactory.getLogger(SmartMisfireHandlingJob.class);
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
Trigger trigger = context.getTrigger();
if (trigger.getPreviousFireTime() != null &&
isMisfired(context)) {
handleMisfire(context);
return;
}
// Normal execution
performBusinessLogic(context);
}
private boolean isMisfired(JobExecutionContext context) {
Date scheduledFireTime = context.getScheduledFireTime();
Date actualFireTime = context.getFireTime();
if (scheduledFireTime == null || actualFireTime == null) {
return false;
}
long delay = actualFireTime.getTime() - scheduledFireTime.getTime();
return delay > context.getTrigger().getMisfireInstruction();
}
private void handleMisfire(JobExecutionContext context) {
Trigger trigger = context.getTrigger();
int misfireInstruction = trigger.getMisfireInstruction();
switch (misfireInstruction) {
case SimpleTrigger.MISFIRE_INSTRUCTION_FIRE_NOW:
logger.info("Misfire detected - firing now");
performBusinessLogic(context);
break;
case SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT:
logger.info("Misfire detected - rescheduling with existing repeat count");
rescheduleTrigger(context);
break;
case SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_REMAINING_REPEAT_COUNT:
logger.info("Misfire detected - rescheduling with remaining repeat count");
rescheduleWithRemainingCount(context);
break;
default:
logger.warn("Unhandled misfire instruction: {}", misfireInstruction);
performBusinessLogic(context);
}
}
private void performBusinessLogic(JobExecutionContext context) {
// Business logic implementation
logger.info("Executing business logic for job: {}",
context.getJobDetail().getKey());
// Simulate work
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void rescheduleTrigger(JobExecutionContext context) {
try {
Trigger oldTrigger = context.getTrigger();
Trigger newTrigger = oldTrigger.getTriggerBuilder()
.startNow()
.build();
context.getScheduler().rescheduleJob(oldTrigger.getKey(), newTrigger);
} catch (SchedulerException e) {
logger.error("Failed to reschedule trigger", e);
}
}
private void rescheduleWithRemainingCount(JobExecutionContext context) {
// Implementation for rescheduling with remaining count
}
}
@Component
public class MisfireHandler {
private final Scheduler scheduler;
public MisfireHandler(Scheduler scheduler) {
this.scheduler = scheduler;
}
public void handleMisfiredTriggers() throws SchedulerException {
List<Trigger> misfiredTriggers = scheduler.getTriggersOfJob(
JobKey.jobKey("monitoringJob", "system"));
for (Trigger trigger : misfiredTriggers) {
if (trigger instanceof OperableTrigger) {
OperableTrigger operableTrigger = (OperableTrigger) trigger;
if (operableTrigger.getNextFireTime() != null &&
operableTrigger.getNextFireTime().before(new Date())) {
updateMisfiredTrigger(operableTrigger);
}
}
}
}
private void updateMisfiredTrigger(OperableTrigger trigger) {
// Custom misfire handling logic
trigger.updateAfterMisfire(null);
if (trigger.getNextFireTime() != null) {
try {
scheduler.rescheduleJob(trigger.getKey(), trigger);
} catch (SchedulerException e) {
logger.error("Failed to reschedule misfired trigger: {}", trigger.getKey(), e);
}
}
}
}
Custom Plugins and Listeners
Advanced Job Listener
@Component
public class AdvancedJobListener implements JobListener {
private static final Logger logger = LoggerFactory.getLogger(AdvancedJobListener.class);
private final MetricsService metricsService;
private final AlertService alertService;
public AdvancedJobListener(MetricsService metricsService, AlertService alertService) {
this.metricsService = metricsService;
this.alertService = alertService;
}
@Override
public String getName() {
return "AdvancedJobListener";
}
@Override
public void jobToBeExecuted(JobExecutionContext context) {
JobDetail jobDetail = context.getJobDetail();
logger.info("Job {} is about to be executed", jobDetail.getKey());
// Record metrics
metricsService.recordJobStart(
jobDetail.getKey().getName(),
jobDetail.getKey().getGroup(),
context.getFireTime()
);
// Check for potential issues
if (isJobOverlapping(context)) {
logger.warn("Job {} might be overlapping with previous execution",
jobDetail.getKey());
}
}
@Override
public void jobExecutionVetoed(JobExecutionContext context) {
logger.info("Job {} execution was vetoed", context.getJobDetail().getKey());
}
@Override
public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
JobDetail jobDetail = context.getJobDetail();
long executionTime = context.getJobRunTime();
// Record completion metrics
metricsService.recordJobCompletion(
jobDetail.getKey().getName(),
jobDetail.getKey().getGroup(),
executionTime,
jobException != null
);
if (jobException != null) {
logger.error("Job {} failed with exception", jobDetail.getKey(), jobException);
// Send alert for job failure
alertService.sendJobFailureAlert(
jobDetail.getKey().toString(),
jobException.getMessage(),
context.getRefireCount()
);
// Implement retry logic
handleJobFailure(context, jobException);
} else {
logger.info("Job {} completed successfully in {} ms",
jobDetail.getKey(), executionTime);
}
// Cleanup job data if needed
cleanupJobData(context);
}
private boolean isJobOverlapping(JobExecutionContext context) {
// Check if job might be overlapping with previous execution
return context.getPreviousFireTime() != null &&
context.getFireTime().getTime() - context.getPreviousFireTime().getTime() <
context.getJobRunTime();
}
private void handleJobFailure(JobExecutionContext context, JobExecutionException jobException) {
if (shouldRetry(context)) {
scheduleRetry(context);
} else {
escalateFailure(context, jobException);
}
}
private boolean shouldRetry(JobExecutionContext context) {
return context.getRefireCount() < getMaxRetries(context);
}
private int getMaxRetries(JobExecutionContext context) {
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
return jobDataMap.getInt("maxRetries", 3);
}
private void scheduleRetry(JobExecutionContext context) {
try {
int retryCount = context.getRefireCount() + 1;
long retryDelay = calculateRetryDelay(retryCount);
Trigger retryTrigger = TriggerBuilder.newTrigger()
.forJob(context.getJobDetail().getKey())
.startAt(DateBuilder.futureDate((int) retryDelay, DateBuilder.IntervalUnit.SECOND))
.withIdentity(context.getTrigger().getKey().getName() + "_retry_" + retryCount,
"retries")
.usingJobData("retryCount", retryCount)
.build();
context.getScheduler().scheduleJob(retryTrigger);
logger.info("Scheduled retry {} for job {} with {} second delay",
retryCount, context.getJobDetail().getKey(), retryDelay);
} catch (SchedulerException e) {
logger.error("Failed to schedule retry for job {}",
context.getJobDetail().getKey(), e);
}
}
private long calculateRetryDelay(int retryCount) {
// Exponential backoff with jitter
long baseDelay = Math.min(300, (long) Math.pow(2, retryCount) * 30); // Max 5 minutes
long jitter = (long) (Math.random() * 30); // Up to 30 seconds jitter
return baseDelay + jitter;
}
private void escalateFailure(JobExecutionContext context, JobExecutionException jobException) {
// Implementation for failure escalation
alertService.sendEscalationAlert(
context.getJobDetail().getKey().toString(),
jobException.getMessage(),
context.getRefireCount()
);
}
private void cleanupJobData(JobExecutionContext context) {
// Cleanup temporary job data
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
jobDataMap.remove("temporaryData");
}
}
Custom Scheduler Plugin
public class MetricsCollectorPlugin implements SchedulerPlugin {
private static final Logger logger = LoggerFactory.getLogger(MetricsCollectorPlugin.class);
private String name;
private MetricsService metricsService;
@Override
public void initialize(String name, Scheduler scheduler,
ClassLoadHelper classLoadHelper) throws SchedulerException {
this.name = name;
this.metricsService = new MetricsService();
// Register MBeans for JMX monitoring
registerMBeans(scheduler);
}
@Override
public void start() {
logger.info("Starting MetricsCollectorPlugin: {}", name);
metricsService.start();
}
@Override
public void shutdown() {
logger.info("Shutting down MetricsCollectorPlugin: {}", name);
metricsService.stop();
}
private void registerMBeans(Scheduler scheduler) {
try {
MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName objectName = new ObjectName("quartz:type=MetricsCollector,name=" + name);
QuartzMetricsMBean mbean = new QuartzMetricsMBean(scheduler, metricsService);
mbeanServer.registerMBean(mbean, objectName);
} catch (Exception e) {
logger.warn("Failed to register MBean for MetricsCollectorPlugin", e);
}
}
// Plugin interface methods
@Override
public List<PropertyBuilder> getExtraProperties() {
return Collections.emptyList();
}
}
public class QuartzMetricsMBean implements DynamicMBean {
private final Scheduler scheduler;
private final MetricsService metricsService;
public QuartzMetricsMBean(Scheduler scheduler, MetricsService metricsService) {
this.scheduler = scheduler;
this.metricsService = metricsService;
}
@Override
public Object getAttribute(String attribute) throws AttributeNotFoundException {
switch (attribute) {
case "JobCount":
return getJobCount();
case "TriggerCount":
return getTriggerCount();
case "SchedulerStatus":
return getSchedulerStatus();
default:
throw new AttributeNotFoundException("Attribute not found: " + attribute);
}
}
@Override
public void setAttribute(Attribute attribute) throws AttributeNotFoundException {
throw new AttributeNotFoundException("Setting attributes not supported");
}
@Override
public AttributeList getAttributes(String[] attributes) {
AttributeList list = new AttributeList();
for (String attribute : attributes) {
try {
list.add(new Attribute(attribute, getAttribute(attribute)));
} catch (AttributeNotFoundException e) {
// Skip missing attributes
}
}
return list;
}
@Override
public AttributeList setAttributes(AttributeList attributes) {
return new AttributeList(); // Setting not supported
}
@Override
public Object invoke(String actionName, Object[] params, String[] signature) {
switch (actionName) {
case "pauseAllJobs":
return pauseAllJobs();
case "resumeAllJobs":
return resumeAllJobs();
default:
throw new IllegalArgumentException("Unknown operation: " + actionName);
}
}
@Override
public MBeanInfo getMBeanInfo() {
MBeanAttributeInfo[] attributes = new MBeanAttributeInfo[] {
new MBeanAttributeInfo("JobCount", "int", "Number of jobs", true, false, false),
new MBeanAttributeInfo("TriggerCount", "int", "Number of triggers", true, false, false),
new MBeanAttributeInfo("SchedulerStatus", "String", "Scheduler status", true, false, false)
};
MBeanOperationInfo[] operations = new MBeanOperationInfo[] {
new MBeanOperationInfo("pauseAllJobs", "Pause all jobs",
new MBeanParameterInfo[0], "void", MBeanOperationInfo.ACTION),
new MBeanOperationInfo("resumeAllJobs", "Resume all jobs",
new MBeanParameterInfo[0], "void", MBeanOperationInfo.ACTION)
};
return new MBeanInfo(getClass().getName(), "Quartz Scheduler Metrics",
attributes, null, operations, null);
}
private int getJobCount() {
try {
return scheduler.getJobKeys(GroupMatcher.anyGroup()).size();
} catch (SchedulerException e) {
return -1;
}
}
private int getTriggerCount() {
try {
return scheduler.getTriggerKeys(GroupMatcher.anyGroup()).size();
} catch (SchedulerException e) {
return -1;
}
}
private String getSchedulerStatus() {
try {
return scheduler.isStarted() ? "STARTED" :
scheduler.isShutdown() ? "SHUTDOWN" : "STANDBY";
} catch (SchedulerException e) {
return "UNKNOWN";
}
}
private void pauseAllJobs() {
try {
scheduler.pauseAll();
} catch (SchedulerException e) {
throw new RuntimeException("Failed to pause all jobs", e);
}
}
private void resumeAllJobs() {
try {
scheduler.resumeAll();
} catch (SchedulerException e) {
throw new RuntimeException("Failed to resume all jobs", e);
}
}
}
Distributed Job Processing
Cluster-Aware Job Service
@Service
public class ClusterAwareJobService {
private final Scheduler scheduler;
private final InstanceIdGenerator instanceIdGenerator;
public ClusterAwareJobService(Scheduler scheduler) {
this.scheduler = scheduler;
this.instanceIdGenerator = new SimpleInstanceIdGenerator();
}
public void scheduleClusterAwareJob(JobDefinition jobDef) throws SchedulerException {
if (!isActiveNode()) {
logger.info("Node is not active, skipping job scheduling: {}", jobDef.getName());
return;
}
JobDetail jobDetail = createJobDetail(jobDef);
Trigger trigger = createTrigger(jobDef);
if (!scheduler.checkExists(jobDetail.getKey())) {
scheduler.scheduleJob(jobDetail, trigger);
logger.info("Scheduled cluster-aware job: {}", jobDef.getName());
}
}
public boolean isActiveNode() {
try {
// Check if this node should be processing jobs
// This could be based on load, capabilities, or configuration
List<String> activeInstances = scheduler.getCurrentlyExecutingJobs()
.stream()
.map(context -> context.getFireInstanceId())
.collect(Collectors.toList());
return activeInstances.contains(scheduler.getSchedulerInstanceId());
} catch (SchedulerException e) {
logger.error("Failed to check active node status", e);
return false;
}
}
public void rebalanceJobs() throws SchedulerException {
// Rebalance jobs across cluster nodes
List<JobKey> allJobs = new ArrayList<>(scheduler.getJobKeys(GroupMatcher.anyGroup()));
for (JobKey jobKey : allJobs) {
if (shouldRebalance(jobKey)) {
rescheduleJobOnOptimalNode(jobKey);
}
}
}
private boolean shouldRebalance(JobKey jobKey) {
// Determine if job should be rebalanced
// Based on load, node capabilities, job requirements, etc.
return true;
}
private void rescheduleJobOnOptimalNode(JobKey jobKey) throws SchedulerException {
JobDetail jobDetail = scheduler.getJobDetail(jobKey);
List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
// Remove job from current node
scheduler.deleteJob(jobKey);
// Reschedule on optimal node (implementation depends on clustering strategy)
if (isOptimalNodeForJob(jobDetail)) {
for (Trigger trigger : triggers) {
scheduler.scheduleJob(jobDetail, trigger);
}
logger.info("Rebalanced job {} to current node", jobKey);
}
}
private boolean isOptimalNodeForJob(JobDetail jobDetail) {
// Determine if current node is optimal for this job
// Based on resources, location, load, etc.
return true;
}
}
Advanced Error Handling and Recovery
Job Recovery Service
@Service
public class JobRecoveryService {
private final Scheduler scheduler;
private final JobRegistry jobRegistry;
public JobRecoveryService(Scheduler scheduler, JobRegistry jobRegistry) {
this.scheduler = scheduler;
this.jobRegistry = jobRegistry;
}
public void recoverFailedJobs() throws SchedulerException {
List<JobExecutionContext> currentlyExecuting =
scheduler.getCurrentlyExecutingJobs();
for (JobExecutionContext context : currentlyExecuting) {
if (isJobStuck(context)) {
recoverStuckJob(context);
}
}
// Check for orphaned jobs
recoverOrphanedJobs();
}
private boolean isJobStuck(JobExecutionContext context) {
long executionTime = System.currentTimeMillis() - context.getFireTime().getTime();
long maxExecutionTime = getMaxExecutionTime(context);
return executionTime > maxExecutionTime;
}
private long getMaxExecutionTime(JobExecutionContext context) {
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
return jobDataMap.getLong("maxExecutionTime", 300000L); // 5 minutes default
}
private void recoverStuckJob(JobExecutionContext context) {
try {
logger.warn("Job {} appears to be stuck, attempting recovery",
context.getJobDetail().getKey());
// Interrupt the job if possible
if (context.getJobInstance() instanceof InterruptableJob) {
((InterruptableJob) context.getJobInstance()).interrupt();
}
// Force completion
scheduler.interrupt(context.getJobDetail().getKey());
// Reschedule if needed
if (shouldRescheduleAfterFailure(context)) {
rescheduleJob(context);
}
} catch (UnableToInterruptJobException e) {
logger.error("Failed to interrupt stuck job: {}",
context.getJobDetail().getKey(), e);
} catch (SchedulerException e) {
logger.error("Failed to recover stuck job: {}",
context.getJobDetail().getKey(), e);
}
}
private void recoverOrphanedJobs() {
// Implementation to detect and recover orphaned jobs
// This would typically involve checking database state vs scheduler state
}
private boolean shouldRescheduleAfterFailure(JobExecutionContext context) {
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
return jobDataMap.getBoolean("rescheduleAfterFailure", true);
}
private void rescheduleJob(JobExecutionContext context) throws SchedulerException {
Trigger oldTrigger = context.getTrigger();
Trigger newTrigger = oldTrigger.getTriggerBuilder()
.startAt(DateBuilder.futureDate(30, DateBuilder.IntervalUnit.SECOND))
.build();
scheduler.rescheduleJob(oldTrigger.getKey(), newTrigger);
}
}
Testing Advanced Quartz Features
Comprehensive Test Suite
@SpringBootTest
@TestPropertySource(properties = {
"org.quartz.scheduler.instanceName=TestScheduler",
"org.quartz.threadPool.threadCount=5",
"org.quartz.jobStore.class=org.quartz.simpl.RAMJobStore"
})
public class AdvancedQuartzTest {
@Autowired
private Scheduler scheduler;
@Test
void testClusterAwareJobScheduling() throws SchedulerException, InterruptedException {
// Test job scheduling in cluster environment
JobDetail job = JobBuilder.newJob(TestJob.class)
.withIdentity("clusterJob", "test")
.storeDurably()
.build();
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("clusterTrigger", "test")
.startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(1)
.repeatForever())
.build();
scheduler.scheduleJob(job, trigger);
// Wait for execution
Thread.sleep(3000);
// Verify job was executed
List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
assertFalse(executingJobs.isEmpty());
}
@Test
void testMisfireHandling() throws SchedulerException, InterruptedException {
// Test misfire handling
JobDetail job = JobBuilder.newJob(SlowJob.class)
.withIdentity("slowJob", "test")
.build();
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("slowTrigger", "test")
.startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(1)
.withMisfireHandlingInstructionFireNow())
.build();
scheduler.scheduleJob(job, trigger);
// Pause scheduler to cause misfires
scheduler.pauseAll();
Thread.sleep(5000);
scheduler.resumeAll();
// Verify misfire handling
Thread.sleep(3000);
// Add assertions for misfire behavior
}
@Test
void testJobChaining() throws SchedulerException {
JobChainingService chainingService = new JobChainingService(scheduler);
List<JobChainingService.JobDefinition> jobs = Arrays.asList(
new JobChainingService.JobDefinition(FirstJob.class, new JobDataMap()),
new JobChainingService.JobDefinition(SecondJob.class, new JobDataMap()),
new JobChainingService.JobDefinition(ThirdJob.class, new JobDataMap())
);
chainingService.scheduleJobChain("testChain", jobs);
// Verify chain was created
assertTrue(scheduler.checkExists(JobKey.jobKey("testChain_job_0", "job_chains")));
assertTrue(scheduler.checkExists(JobKey.jobKey("testChain_job_1", "job_chains")));
assertTrue(scheduler.checkExists(JobKey.jobKey("testChain_job_2", "job_chains")));
}
public static class TestJob implements Job {
@Override
public void execute(JobExecutionContext context) {
// Test implementation
}
}
public static class SlowJob implements Job {
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
try {
Thread.sleep(3000); // Simulate slow execution
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new JobExecutionException("Job interrupted", e);
}
}
}
public static class FirstJob implements Job {
@Override
public void execute(JobExecutionContext context) {
// First job in chain
}
}
public static class SecondJob implements Job {
@Override
public void execute(JobExecutionContext context) {
// Second job in chain
}
}
public static class ThirdJob implements Job {
@Override
public void execute(JobExecutionContext context) {
// Third job in chain
}
}
}
Conclusion
This advanced Quartz Scheduler implementation provides:
- Cluster Configuration - High-availability and load distribution
- Advanced Job Types - Stateful jobs, chained jobs, and custom triggers
- Sophisticated Misfire Handling - Custom misfire instructions and recovery
- Custom Plugins and Listeners - Enhanced monitoring and management
- Distributed Processing - Cluster-aware job scheduling and rebalancing
- Advanced Error Handling - Comprehensive recovery mechanisms
- Business Calendars - Custom scheduling based on business rules
These advanced features make Quartz suitable for enterprise-grade scheduling requirements with robust fault tolerance, scalability, and maintainability.