Quartz集群

  1. quartz主要接口
  2. 基本使用
  3. quartz集群
  4. quartz 数据库表
  5. spring集成quartz
    1. SchedulerFactoryBean
    2. QuartzAutoConfiguration
    3. 工具类
      1. application.yml
      2. quartzMgr.java

quartz主要接口

  • Scheduler调度器接口
  • JobDetail job描述
  • Trigger触发器
  • Job 具体的job
  • 各种监听器 JobListener TriggerListener SchedulerListener

一个JobDetail包括一个job 其可以绑定多个触发器 ,然后交给调度器调度执行。

基本使用

//通过调度工厂创建调度器
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
        Scheduler scheduler = schedulerFactory.getScheduler();
        //添加监听器 可以为指定job 触发器 通过Matcher比较器接口
        //scheduler.getListenerManager().addJobListener();
        //scheduler.getListenerManager().addTriggerListener();
        //scheduler.getListenerManager().addSchedulerListener();
        //通过Myjob创建jobDetail Myjob实现Job接口
        //主要job实现在execute(JobExecutionContext context)方法
        JobDetail jobDetail = JobBuilder.newJob(Myjob.class).withIdentity("jobname","jobgroup").requestRecovery()
                .build();
        //创建触发器 
        Trigger trigger =TriggerBuilder.newTrigger()
        .withIdentity("triggername","triggergroup")
                .withSchedule(CronScheduleBuilder.cronSchedule("0 * * * * ?"))
                .startNow()//立即启动
                .build();
        //绑定 job和触发器
        scheduler.scheduleJob(jobDetail,trigger);
        scheduler.start();//启动所有job  可以在创建jobDetail 和trigger 之前启动

quartz集群

quartz实现集群的方式时通过数据库锁的方式,其隶属于悲观锁,主要通过其配置文件来初始化调度器,其配置文件名为quartz.properties ,quartz默认使用StdScheduler的实现,其中包括大量的配置,其中部分配置如下

#基础配置
#指定实例名 默认QuartzScheduler
org.quartz.scheduler.instanceName =
#集群模式使用AUTO 全大写
org.quartz.scheduler.instanceId= AUTO
#指定实例id生成器
#org.quartz.scheduler.instanceIdGenerator.class=org.quartz.simpl.SimpleInstanceIdGenerator
#org.quartz.scheduler.threadName =
#org.quartz.scheduler.makeSchedulerThreadDaemon=true
#org.quartz.scheduler.threadsInheritContextClassLoaderOfInitializer=
# 线程配置
org.quartz.threadPool.class =org.quartz.simpl.SimpleThreadPool
# 线程数大于0
org.quartz.threadPool.threadCount = 10
#线程优先级
org.quartz.threadPool.threadPriority=5
#job存储 默认使用RAMJobStore 内存存储 集群默认使用JobStoreTX  jdbc存储
#jobstore
org.quartz.jobStore.class =org.quartz.impl.jdbcjobstore.JobStoreTX
#数据源名称 必须
org.quartz.jobStore.DataSource=mydatasource
#驱动代理类 必须
org.quartz.jobStore.DriverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.useProperties=true
#集群模式必须
org.quartz.jobStore.isClustered=true

#datasource数据源配置  也可使用jndi
org.quartz.dataSource=mydatasource
org.quartz.dataSource.mydatasource.driver=com.mysql.jdbc.Driver
org.quartz.dataSource.mydatasource.URL=jdbc:mysql://192.168.118.134:3306/quartz?characterEncoding=utf-8&useSSL=false
org.quartz.dataSource.mydatasource.user=
org.quartz.dataSource.mydatasource.password=
org.quartz.dataSource.mydatasource.maxConnections=10


#org.quartz.dataSource.mydatasource.connectionProvider.class=com.myquartz.config.MyConnectionProvider

        
###更多配置 stdSchedulerFactory

调度工厂拥有两种构造器,通过properties 或者filepath 的方式加载配置文件,再通过配置文件创建调度器。

quartz 数据库表

 table qrtz_calendars;--存储calendars信息
 table qrtz_fired_triggers;--存储已触发的触发器的状态和相关job信息
 table qrtz_blob_triggers;--属于blob类型存储的自定义类型触发器
 table qrtz_cron_triggers;--存储cron_triggers的触发器
 table qrtz_simple_triggers;--存储简单的触发器
 table qrtz_simprop_triggers;--存储CalendarIntervalTrigger和DailyTimeIntervalTrigger两种类型触发器
 table qrtz_triggers;--存储触发器信息
 table qrtz_job_details;-- 存储job信息
 table qrtz_paused_trigger_grps;-- 存储已暂停的触发器
 table qrtz_locks;--存储锁信息
 table qrtz_scheduler_state;--存储调度器状态

spring集成quartz

spring中已经集成了quartz ,其源码位置在spring-context 和 spring-context-support 下的scheduling包下,在传统的xml配置方式中,有个关键的类SchedulerFactoryBean,是作为spring和quartz连接的桥梁,在springboot中再次升级,采用自动配置的方式,所采用的类为QuartzAutoConfiguration,万变不离其宗,其中提供了关键了SchedulerFactoryBean的bean注入。

SchedulerFactoryBean

SchedulerFactoryBean实现了InitializingBean ,其实现方法afterPropertiesSet即使关键中的关键,该方法默认会通过schedulerFactory创建一个stdscheduler,同时向stdscheduler中添加job ,触发器,以及监听器。

@Override
	public void afterPropertiesSet() throws Exception {
		.......
		this.scheduler = prepareScheduler(prepareSchedulerFactory());
		try {
			registerListeners();
			registerJobsAndTriggers();
		}
		catch (Exception ex) {
			......
		}
	}
	private SchedulerFactory prepareSchedulerFactory() throws SchedulerException, IOException {
		SchedulerFactory schedulerFactory = this.schedulerFactory;
		if (schedulerFactory == null) {
			schedulerFactory = BeanUtils.instantiateClass(this.schedulerFactoryClass);
...
		return schedulerFactory;
	}

QuartzAutoConfiguration

QuartzAutoConfiguration类是一个自动配置类,在项目启动时,通过@EnableAutoConfiguration进行配置的加载,该类出现在spring.factories文件中。其头部包括大量注解,通过@EnableConfigurationProperties(QuartzProperties.class)可以将quartz的配置信息从application配置文件中加载进来。AutoConfigureAfter则表示该类需要在指定的几个类都自动配置完成之后再进行配置。

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ Scheduler.class, SchedulerFactoryBean.class, PlatformTransactionManager.class })
@EnableConfigurationProperties(QuartzProperties.class)
@AutoConfigureAfter({ DataSourceAutoConfiguration.class, HibernateJpaAutoConfiguration.class,
		LiquibaseAutoConfiguration.class, FlywayAutoConfiguration.class })

该类最为重要的是其中的quartzScheduler方法,其参数很多,具体如下,包括 几个重要的bean 其中有jobDetails,和 triggers 分别是任务和触发器的bean 表示该类方法还可将注册为bean的job和触发器 交由调度器调度 ,所以也可以直接将job和触发器注册为bean。

@Bean
	@ConditionalOnMissingBean
	public SchedulerFactoryBean quartzScheduler(QuartzProperties properties,
			ObjectProvider<SchedulerFactoryBeanCustomizer> customizers, ObjectProvider<JobDetail> jobDetails,
			Map<String, Calendar> calendars, ObjectProvider<Trigger> triggers, ApplicationContext applicationContext)

工具类

application.yml

spring:
  quartz:
      schedulerName: myscheduler
      autoStartup: true
      job-store-type: JDBC
#      jdbc:
#          schema: classpath:org/quartz/impl/jdbcjobstore/tables_mysql.sql
#      initialize-schema: ALWAYS
# 可以用以添加quartz.properties有的配置
      properties: 
          org:
            quartz:
				scheduler:
#全大写
                    instanceId: AUTO
                threadPool:
                    class : org.quartz.simpl.SimpleThreadPool
                    threadCount : 10
                    threadPriority: 5
                jobStore:
                    isClustered: true

quartzMgr.java

package com.myquartz.utils;

import org.quartz.*;

import java.util.ArrayList;
import java.util.List;


/**
 * quartz 工具类
 * *Note: 一个job可以绑定多个触发器 一个触发器只能绑定一个job
 * 使用
 *  1.将该类注入通过QuartzMgr(Scheduler scheduler)实例化
 *  2.创建job createJob(...)
 *  3.绑定触发器 bindTrigger(....)
 *
 *  spring 注入
 *   @ConditionalOnClass(value={SchedulerFactoryBean.class})
 *    @Bean
 *    public QuartzMgr quartzMgr(SchedulerFactoryBean quartzScheduler){
 *         QuartzMgr quartzMgr =new QuartzMgr(quartzScheduler.getScheduler());
 *         return quartzMgr;
 *    }
 */

public final class QuartzMgr {

    private  QuartzMgr(){}
    private static Scheduler scheduler = null;

    /**
     * 初始化
     * @param scheduler
     */
    public  QuartzMgr(Scheduler scheduler){
        if(this.scheduler==null){
            synchronized (QuartzMgr.class){
                if(this.scheduler==null){
                    this.scheduler = scheduler;
                }
            }
        }
    }

    /**
     * 创建 job
     * @param jobKey job名称 组
     * @param job job执行class
     * @param jobdescription job描述
     * @param map job传入参数 可在job类中通过JobExecutionContext获得
     * @return
     * @throws SchedulerException
     */
    public JobKey createJob(JobKey jobKey,Class job,String jobdescription,JobDataMap map) throws SchedulerException, ClassNotFoundException {
        if(scheduler.checkExists(jobKey)){
            throw  new SchedulerException(jobKey.toString()+"该job已存在");
//            return jobKey;
        }
       if(!Job.class.isAssignableFrom(job)){//判断是Job的实现
           throw  new SchedulerException(jobKey.toString()+"该Class不是一个Job实现");
       }
        JobDetail jobDetail = JobBuilder.newJob()
                .ofType(job)
                .storeDurably(true)//无论是否关联trigger都存在
                .withIdentity(jobKey)
                .withDescription(jobdescription)
                .requestRecovery(true)//调度失败请求重新调度
                .setJobData(map)
                .build();
        scheduler.addJob(jobDetail,false);
        return jobKey;
    }


    /**
     * @see DailyTimeIntervalScheduleBuilder
     * @see CronScheduleBuilder
     * @see CalendarIntervalScheduleBuilder
     * @see SimpleScheduleBuilder
     * job绑定一个新触发器
     * 一个job可以绑定多个触发器, 但是一个触发器只能绑定一个job
     * @param jobKey 已存在的job 或者 createJob()创建
     * @param triggerKey 触发器名称
     * @param map  参数
     * @param schedBuilder
     * @see DailyTimeIntervalScheduleBuilder
     * @see CronScheduleBuilder
     * @see CalendarIntervalScheduleBuilder
     * @see SimpleScheduleBuilder
     * @param triggerdescription trigger描述
     * @return
     * @throws SchedulerException
     */
    public void bindTrigger(JobKey jobKey,TriggerKey triggerKey,JobDataMap map,ScheduleBuilder schedBuilder,String triggerdescription) throws SchedulerException {
        if(scheduler.checkExists(triggerKey)){
            throw  new SchedulerException(triggerKey.toString()+"该触发器已存在");
        }
        if(!scheduler.checkExists(jobKey)){
            throw  new SchedulerException(jobKey.toString()+"该job不存在");
        }
        TriggerBuilder triggerBuilder = TriggerBuilder.newTrigger();
        triggerBuilder
                .usingJobData(map)
                .withIdentity(triggerKey)
                .withSchedule(schedBuilder)//设定出发时间
                .withDescription(triggerdescription)
                .forJob(jobKey)
                .startNow();
        Trigger trigger = triggerBuilder.build();
        scheduler.scheduleJob(trigger);
//        return true;

    }




    /**
     * 更新触发时点
     * @param
     * @return
     */
    public void updateTrigger(TriggerKey oldtriggerKey,JobDataMap map,ScheduleBuilder schedBuilder,String triggerdescription) throws SchedulerException {
        if( !scheduler.checkExists(oldtriggerKey) )
        {
            throw  new SchedulerException(oldtriggerKey.toString()+"不存在");
        }
        Trigger oldtrigger = scheduler.getTrigger(oldtriggerKey);
        if (oldtrigger==null){
            throw  new SchedulerException(oldtriggerKey.toString()+"不存在");
        }
        JobKey jobKey = oldtrigger.getJobKey();
        Trigger newTrigger = TriggerBuilder.newTrigger()
                    .startNow()
                    .usingJobData(map)
                    .forJob(jobKey)
                    .withIdentity(oldtriggerKey)
                    .withSchedule(schedBuilder)
                    .withDescription(triggerdescription)
                    .build();
        scheduler.rescheduleJob(oldtriggerKey,newTrigger);
//        return true;
    }

    /**
     * 删除任务
     * @return
     */
    public void delScheduler(JobKey jobKey) throws SchedulerException {
        if(!scheduler.checkExists(jobKey)){
            throw  new SchedulerException("job"+jobKey.toString()+"不存在");
        }
        List<TriggerKey> lists =getTriggerKeysOfJob(jobKey);
        for(TriggerKey triggerKey : lists){
            //停止触发器
            scheduler.pauseTrigger(triggerKey);
            //移除触发器
            scheduler.unscheduleJob(triggerKey);
        }
        //删除任务
        scheduler.deleteJob(jobKey);
        //return true;
    }

    /**
     * 停止任务
     * @param jobKey
     * @param triggerKey
     * @return
     */
    public boolean pauseScheduler(JobKey jobKey,TriggerKey triggerKey) throws SchedulerException {
        if(!scheduler.checkExists(jobKey)|| !scheduler.checkExists(triggerKey)){
            throw  new SchedulerException("触发器"+triggerKey.toString()+"不存在或者job"+jobKey.toString()+"不存在");
        }
        scheduler.pauseTrigger(triggerKey);
        scheduler.pauseJob(jobKey);
        return true;
    }


    /**
     * 恢复任务
     * @param jobKey
     * @param triggerKey
     * @return
     */
    public boolean resumeScheduler(JobKey jobKey,TriggerKey triggerKey) throws SchedulerException {
        if(!scheduler.checkExists(jobKey)|| !scheduler.checkExists(triggerKey)){
            throw  new SchedulerException("触发器"+triggerKey.toString()+"不存在或者job"+jobKey.toString()+"不存在");
        }
        scheduler.resumeJob(jobKey);
        scheduler.resumeTrigger(triggerKey);

        return true;
    }

    /**
     * 获得所有于该job绑定的触发器
     * @param jobKey
     * @return
     */
    public List<TriggerKey> getTriggerKeysOfJob(JobKey jobKey) throws SchedulerException {
        if(!scheduler.checkExists(jobKey)){
            throw  new SchedulerException("job"+jobKey.toString()+"不存在");
        }
        List<TriggerKey> triggerKeys = new ArrayList<>();
        List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
        for (Trigger t:triggers) {
            triggerKeys.add(t.getKey());
        }
        return triggerKeys;
    }

    /**
     * 获得触发器状态
     *
     * @param triggerKey
     * @return NONE, NORMAL, PAUSED, COMPLETE, ERROR, BLOCKED
     * @throws SchedulerException
     */
    public Trigger.TriggerState getTriggerState(TriggerKey triggerKey) throws SchedulerException {
        return  scheduler.getTriggerState(triggerKey);
    }

    /**
     * 获得正在执行的job
     * @return
     * @throws SchedulerException
     */
    public  List<JobKey> getExectingJob() throws SchedulerException {
        List<JobKey> jobKeys = new ArrayList<>();
        List<JobExecutionContext> lists =scheduler.getCurrentlyExecutingJobs();
        for(JobExecutionContext jobExecutionContext : lists){
            jobKeys.add(jobExecutionContext.getJobDetail().getKey());
        }
        return jobKeys;
    }


}