quartz主要接口
- Scheduler调度器接口
- JobDetail job描述
- Trigger触发器
- Job 具体的job
- 各种监听器 JobListener TriggerListener SchedulerListener
一个JobDetail包括一个job 其可以绑定多个触发器 ,然后交给调度器调度执行。
基本使用
//通过调度工厂创建调度器
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
Scheduler scheduler = schedulerFactory.getScheduler();
//添加监听器 可以为指定job 触发器 通过Matcher比较器接口
//scheduler.getListenerManager().addJobListener();
//scheduler.getListenerManager().addTriggerListener();
//scheduler.getListenerManager().addSchedulerListener();
//通过Myjob创建jobDetail Myjob实现Job接口
//主要job实现在execute(JobExecutionContext context)方法
JobDetail jobDetail = JobBuilder.newJob(Myjob.class).withIdentity("jobname","jobgroup").requestRecovery()
.build();
//创建触发器
Trigger trigger =TriggerBuilder.newTrigger()
.withIdentity("triggername","triggergroup")
.withSchedule(CronScheduleBuilder.cronSchedule("0 * * * * ?"))
.startNow()//立即启动
.build();
//绑定 job和触发器
scheduler.scheduleJob(jobDetail,trigger);
scheduler.start();//启动所有job 可以在创建jobDetail 和trigger 之前启动
quartz集群
quartz实现集群的方式时通过数据库锁的方式,其隶属于悲观锁,主要通过其配置文件来初始化调度器,其配置文件名为quartz.properties ,quartz默认使用StdScheduler的实现,其中包括大量的配置,其中部分配置如下
#基础配置
#指定实例名 默认QuartzScheduler
org.quartz.scheduler.instanceName =
#集群模式使用AUTO 全大写
org.quartz.scheduler.instanceId= AUTO
#指定实例id生成器
#org.quartz.scheduler.instanceIdGenerator.class=org.quartz.simpl.SimpleInstanceIdGenerator
#org.quartz.scheduler.threadName =
#org.quartz.scheduler.makeSchedulerThreadDaemon=true
#org.quartz.scheduler.threadsInheritContextClassLoaderOfInitializer=
# 线程配置
org.quartz.threadPool.class =org.quartz.simpl.SimpleThreadPool
# 线程数大于0
org.quartz.threadPool.threadCount = 10
#线程优先级
org.quartz.threadPool.threadPriority=5
#job存储 默认使用RAMJobStore 内存存储 集群默认使用JobStoreTX jdbc存储
#jobstore
org.quartz.jobStore.class =org.quartz.impl.jdbcjobstore.JobStoreTX
#数据源名称 必须
org.quartz.jobStore.DataSource=mydatasource
#驱动代理类 必须
org.quartz.jobStore.DriverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.useProperties=true
#集群模式必须
org.quartz.jobStore.isClustered=true
#datasource数据源配置 也可使用jndi
org.quartz.dataSource=mydatasource
org.quartz.dataSource.mydatasource.driver=com.mysql.jdbc.Driver
org.quartz.dataSource.mydatasource.URL=jdbc:mysql://192.168.118.134:3306/quartz?characterEncoding=utf-8&useSSL=false
org.quartz.dataSource.mydatasource.user=
org.quartz.dataSource.mydatasource.password=
org.quartz.dataSource.mydatasource.maxConnections=10
#org.quartz.dataSource.mydatasource.connectionProvider.class=com.myquartz.config.MyConnectionProvider
###更多配置 stdSchedulerFactory
调度工厂拥有两种构造器,通过properties 或者filepath 的方式加载配置文件,再通过配置文件创建调度器。
quartz 数据库表
table qrtz_calendars;--存储calendars信息
table qrtz_fired_triggers;--存储已触发的触发器的状态和相关job信息
table qrtz_blob_triggers;--属于blob类型存储的自定义类型触发器
table qrtz_cron_triggers;--存储cron_triggers的触发器
table qrtz_simple_triggers;--存储简单的触发器
table qrtz_simprop_triggers;--存储CalendarIntervalTrigger和DailyTimeIntervalTrigger两种类型触发器
table qrtz_triggers;--存储触发器信息
table qrtz_job_details;-- 存储job信息
table qrtz_paused_trigger_grps;-- 存储已暂停的触发器
table qrtz_locks;--存储锁信息
table qrtz_scheduler_state;--存储调度器状态
spring集成quartz
spring中已经集成了quartz ,其源码位置在spring-context 和 spring-context-support 下的scheduling包下,在传统的xml配置方式中,有个关键的类SchedulerFactoryBean,是作为spring和quartz连接的桥梁,在springboot中再次升级,采用自动配置的方式,所采用的类为QuartzAutoConfiguration,万变不离其宗,其中提供了关键了SchedulerFactoryBean的bean注入。
SchedulerFactoryBean
SchedulerFactoryBean实现了InitializingBean ,其实现方法afterPropertiesSet即使关键中的关键,该方法默认会通过schedulerFactory创建一个stdscheduler,同时向stdscheduler中添加job ,触发器,以及监听器。
@Override
public void afterPropertiesSet() throws Exception {
.......
this.scheduler = prepareScheduler(prepareSchedulerFactory());
try {
registerListeners();
registerJobsAndTriggers();
}
catch (Exception ex) {
......
}
}
private SchedulerFactory prepareSchedulerFactory() throws SchedulerException, IOException {
SchedulerFactory schedulerFactory = this.schedulerFactory;
if (schedulerFactory == null) {
schedulerFactory = BeanUtils.instantiateClass(this.schedulerFactoryClass);
...
return schedulerFactory;
}
QuartzAutoConfiguration
QuartzAutoConfiguration类是一个自动配置类,在项目启动时,通过@EnableAutoConfiguration进行配置的加载,该类出现在spring.factories文件中。其头部包括大量注解,通过@EnableConfigurationProperties(QuartzProperties.class)可以将quartz的配置信息从application配置文件中加载进来。AutoConfigureAfter则表示该类需要在指定的几个类都自动配置完成之后再进行配置。
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ Scheduler.class, SchedulerFactoryBean.class, PlatformTransactionManager.class })
@EnableConfigurationProperties(QuartzProperties.class)
@AutoConfigureAfter({ DataSourceAutoConfiguration.class, HibernateJpaAutoConfiguration.class,
LiquibaseAutoConfiguration.class, FlywayAutoConfiguration.class })
该类最为重要的是其中的quartzScheduler方法,其参数很多,具体如下,包括 几个重要的bean 其中有jobDetails,和 triggers 分别是任务和触发器的bean 表示该类方法还可将注册为bean的job和触发器 交由调度器调度 ,所以也可以直接将job和触发器注册为bean。
@Bean
@ConditionalOnMissingBean
public SchedulerFactoryBean quartzScheduler(QuartzProperties properties,
ObjectProvider<SchedulerFactoryBeanCustomizer> customizers, ObjectProvider<JobDetail> jobDetails,
Map<String, Calendar> calendars, ObjectProvider<Trigger> triggers, ApplicationContext applicationContext)
工具类
application.yml
spring:
quartz:
schedulerName: myscheduler
autoStartup: true
job-store-type: JDBC
# jdbc:
# schema: classpath:org/quartz/impl/jdbcjobstore/tables_mysql.sql
# initialize-schema: ALWAYS
# 可以用以添加quartz.properties有的配置
properties:
org:
quartz:
scheduler:
#全大写
instanceId: AUTO
threadPool:
class : org.quartz.simpl.SimpleThreadPool
threadCount : 10
threadPriority: 5
jobStore:
isClustered: true
quartzMgr.java
package com.myquartz.utils;
import org.quartz.*;
import java.util.ArrayList;
import java.util.List;
/**
* quartz 工具类
* *Note: 一个job可以绑定多个触发器 一个触发器只能绑定一个job
* 使用
* 1.将该类注入通过QuartzMgr(Scheduler scheduler)实例化
* 2.创建job createJob(...)
* 3.绑定触发器 bindTrigger(....)
*
* spring 注入
* @ConditionalOnClass(value={SchedulerFactoryBean.class})
* @Bean
* public QuartzMgr quartzMgr(SchedulerFactoryBean quartzScheduler){
* QuartzMgr quartzMgr =new QuartzMgr(quartzScheduler.getScheduler());
* return quartzMgr;
* }
*/
public final class QuartzMgr {
private QuartzMgr(){}
private static Scheduler scheduler = null;
/**
* 初始化
* @param scheduler
*/
public QuartzMgr(Scheduler scheduler){
if(this.scheduler==null){
synchronized (QuartzMgr.class){
if(this.scheduler==null){
this.scheduler = scheduler;
}
}
}
}
/**
* 创建 job
* @param jobKey job名称 组
* @param job job执行class
* @param jobdescription job描述
* @param map job传入参数 可在job类中通过JobExecutionContext获得
* @return
* @throws SchedulerException
*/
public JobKey createJob(JobKey jobKey,Class job,String jobdescription,JobDataMap map) throws SchedulerException, ClassNotFoundException {
if(scheduler.checkExists(jobKey)){
throw new SchedulerException(jobKey.toString()+"该job已存在");
// return jobKey;
}
if(!Job.class.isAssignableFrom(job)){//判断是Job的实现
throw new SchedulerException(jobKey.toString()+"该Class不是一个Job实现");
}
JobDetail jobDetail = JobBuilder.newJob()
.ofType(job)
.storeDurably(true)//无论是否关联trigger都存在
.withIdentity(jobKey)
.withDescription(jobdescription)
.requestRecovery(true)//调度失败请求重新调度
.setJobData(map)
.build();
scheduler.addJob(jobDetail,false);
return jobKey;
}
/**
* @see DailyTimeIntervalScheduleBuilder
* @see CronScheduleBuilder
* @see CalendarIntervalScheduleBuilder
* @see SimpleScheduleBuilder
* job绑定一个新触发器
* 一个job可以绑定多个触发器, 但是一个触发器只能绑定一个job
* @param jobKey 已存在的job 或者 createJob()创建
* @param triggerKey 触发器名称
* @param map 参数
* @param schedBuilder
* @see DailyTimeIntervalScheduleBuilder
* @see CronScheduleBuilder
* @see CalendarIntervalScheduleBuilder
* @see SimpleScheduleBuilder
* @param triggerdescription trigger描述
* @return
* @throws SchedulerException
*/
public void bindTrigger(JobKey jobKey,TriggerKey triggerKey,JobDataMap map,ScheduleBuilder schedBuilder,String triggerdescription) throws SchedulerException {
if(scheduler.checkExists(triggerKey)){
throw new SchedulerException(triggerKey.toString()+"该触发器已存在");
}
if(!scheduler.checkExists(jobKey)){
throw new SchedulerException(jobKey.toString()+"该job不存在");
}
TriggerBuilder triggerBuilder = TriggerBuilder.newTrigger();
triggerBuilder
.usingJobData(map)
.withIdentity(triggerKey)
.withSchedule(schedBuilder)//设定出发时间
.withDescription(triggerdescription)
.forJob(jobKey)
.startNow();
Trigger trigger = triggerBuilder.build();
scheduler.scheduleJob(trigger);
// return true;
}
/**
* 更新触发时点
* @param
* @return
*/
public void updateTrigger(TriggerKey oldtriggerKey,JobDataMap map,ScheduleBuilder schedBuilder,String triggerdescription) throws SchedulerException {
if( !scheduler.checkExists(oldtriggerKey) )
{
throw new SchedulerException(oldtriggerKey.toString()+"不存在");
}
Trigger oldtrigger = scheduler.getTrigger(oldtriggerKey);
if (oldtrigger==null){
throw new SchedulerException(oldtriggerKey.toString()+"不存在");
}
JobKey jobKey = oldtrigger.getJobKey();
Trigger newTrigger = TriggerBuilder.newTrigger()
.startNow()
.usingJobData(map)
.forJob(jobKey)
.withIdentity(oldtriggerKey)
.withSchedule(schedBuilder)
.withDescription(triggerdescription)
.build();
scheduler.rescheduleJob(oldtriggerKey,newTrigger);
// return true;
}
/**
* 删除任务
* @return
*/
public void delScheduler(JobKey jobKey) throws SchedulerException {
if(!scheduler.checkExists(jobKey)){
throw new SchedulerException("job"+jobKey.toString()+"不存在");
}
List<TriggerKey> lists =getTriggerKeysOfJob(jobKey);
for(TriggerKey triggerKey : lists){
//停止触发器
scheduler.pauseTrigger(triggerKey);
//移除触发器
scheduler.unscheduleJob(triggerKey);
}
//删除任务
scheduler.deleteJob(jobKey);
//return true;
}
/**
* 停止任务
* @param jobKey
* @param triggerKey
* @return
*/
public boolean pauseScheduler(JobKey jobKey,TriggerKey triggerKey) throws SchedulerException {
if(!scheduler.checkExists(jobKey)|| !scheduler.checkExists(triggerKey)){
throw new SchedulerException("触发器"+triggerKey.toString()+"不存在或者job"+jobKey.toString()+"不存在");
}
scheduler.pauseTrigger(triggerKey);
scheduler.pauseJob(jobKey);
return true;
}
/**
* 恢复任务
* @param jobKey
* @param triggerKey
* @return
*/
public boolean resumeScheduler(JobKey jobKey,TriggerKey triggerKey) throws SchedulerException {
if(!scheduler.checkExists(jobKey)|| !scheduler.checkExists(triggerKey)){
throw new SchedulerException("触发器"+triggerKey.toString()+"不存在或者job"+jobKey.toString()+"不存在");
}
scheduler.resumeJob(jobKey);
scheduler.resumeTrigger(triggerKey);
return true;
}
/**
* 获得所有于该job绑定的触发器
* @param jobKey
* @return
*/
public List<TriggerKey> getTriggerKeysOfJob(JobKey jobKey) throws SchedulerException {
if(!scheduler.checkExists(jobKey)){
throw new SchedulerException("job"+jobKey.toString()+"不存在");
}
List<TriggerKey> triggerKeys = new ArrayList<>();
List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
for (Trigger t:triggers) {
triggerKeys.add(t.getKey());
}
return triggerKeys;
}
/**
* 获得触发器状态
*
* @param triggerKey
* @return NONE, NORMAL, PAUSED, COMPLETE, ERROR, BLOCKED
* @throws SchedulerException
*/
public Trigger.TriggerState getTriggerState(TriggerKey triggerKey) throws SchedulerException {
return scheduler.getTriggerState(triggerKey);
}
/**
* 获得正在执行的job
* @return
* @throws SchedulerException
*/
public List<JobKey> getExectingJob() throws SchedulerException {
List<JobKey> jobKeys = new ArrayList<>();
List<JobExecutionContext> lists =scheduler.getCurrentlyExecutingJobs();
for(JobExecutionContext jobExecutionContext : lists){
jobKeys.add(jobExecutionContext.getJobDetail().getKey());
}
return jobKeys;
}
}