quartz cluster原理与实践

调度任务的特点是执行频率低,执行时间长,不能重复执行,最典型的场景是生成报表。之前为了保证定时任务不重复执行,调度程序都是单机部署,单机部署最大的问题是存在单点故障,单点故障对系统可靠性是最大的杀手,要尽量避免。

正如上面所说,单机部署调度程序主要是为了避免任务重复执行,那如何在分布式部署环境中做到这点呢,很容易想到的一种方法是:锁+已执行标记, 通过加锁保证不会并发执行,通过已执行标记保证后来获取锁的实例即使获取了锁也不会再次执行,加锁还要根据任务是不是纯数据库操作选择数据库锁或者分布式锁,如果采用这种方法,开发人员对每一种定时任务都要开发额外的代码保证不重复执行,对开发人员来说既是负担又容易犯错。quartz cluster可以从框架层面解决这个问题,只需要开发人员遵循少量的原则。

原理

为了让大家读起来更容易,下面会从较高的层面上分析quartz cluster的工作原理,不会深入代码细节,(想看细节的可以参考这篇文章)。

quartz中有两个重要概念trigger和job, 一个job可以由多个trigger触发. quartz cluster通过trigger表控制任务的调度,trigger表中最关键字段是:job_name, job_group, next_fire_time, trigger_state。前两个字段用于关联任务表,next_fire_time保存下一次触发时间,trigger_state用于控制调度状态,是保证不重复执行的关键。

  • 调度程序第一次启动

    quartz会将定时任务配置保存到数据库中(trigger表和job_detail表),计算最近一次的触发时间并保存到next_fire_time字段中. 同时会启动一个后台调度线程循环获取待执行任务并触发执行。

  • 后台调度线程的工作过程

    1. (运行在事务中)从trigger表中获取状态为waiting的待执行的trigger(sql语句是伪代码),并将获取到的trigger状态置为acquired, 计算并保存下一次触发时间。

      select * from trigger where state='waiting' and next_fire_time between now() - misfireThreshold and now() + idleTime
      

      misfireThreshold和idleTime都是quartz的配置项,前者表示即使触发时间已经过去了,只要在一定范围内还是允许执行,后者代表未来的一个时间窗口,在这个窗口内触发的定时任务都可以被该调度器实例提前锁定。

    2. 如果第1步获取到了任务,执行wait()方法,直到触发时间到来。

    3. (运行在事务中)触发时间到来后,如果任务设置为不可并发执行,则将与该任务相关的所有trigger状态从acquired变为blocked.如果允许并发执行,将trigger状态从acquired变为waiting.
    4. 将trigger中的任务放入线程池中执行
    5. 任务执行完毕后,如果任务是不可并发执行的,将trigger的状态从blocked变为waiting

      保证任务不会重复执行的关键是第3步,任务执行过程中trigger的状态是blocked,其他调度器实例就无法调度该trigger了
  • 失败任务的恢复

    如果调度器实例锁定trigger后还未执行完任务就down掉了,那这个trigger锁定的任务如何转移到其他存活的调度器实例呢?这就要依赖check in机制了,正常运行状态下,每个调度器实例每隔固定时间(比如20秒)会去数据库更新自己的状态记录(scheduler_state表中last_checkin_time字段), 并且检查其他实例有没有按时check_in,如果没有则认为实例失败了,释放失败实例之前锁定的trigger(将trigger状态从acquired,blocked变为waiting)。

实践

  1. 准备工作

    quartz cluster非常依赖各调度器实例所在服务器的时间保持同步,配置时间同步可参考这篇文章

  2. 新建一个数据库,执行quartz的初始化脚本, 脚本中表的前缀默认为QUARTZ,可以修改成其他的。

  3. 在调度器实例中增加如下依赖

    
    dependencies {
     compile project(':framework:framework-base')
     compile project(':framework:framework-dao-base')
    }
    
  4. Spring 配置文件

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:tx="http://www.springframework.org/schema/tx" 
       xmlns:mybatis="http://mybatis.org/schema/mybatis-spring"
       xsi:schemaLocation="http://www.springframework.org/schema/beans        http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
           http://www.springframework.org/schema/context                http://www.springframework.org/schema/context/spring-context-3.1.xsd
           http://www.springframework.org/schema/aop                    http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
           http://www.springframework.org/schema/mvc                    http://www.springframework.org/schema/mvc/spring-mvc-3.1.xsd
           http://mybatis.org/schema/mybatis-spring                    http://mybatis.org/schema/mybatis-spring.xsd
           http://www.springframework.org/schema/tx                    http://www.springframework.org/schema/tx/spring-tx.xsd">
    
       <context:annotation-config />  
       <context:component-scan base-package="com.cana.quartz" />  
    
       <bean id="quartzScheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">  
           <property name="dataSource" ref="quartzDataSource" />  
           <property name="quartzProperties">  
               <props>  
                   <!-- 集群模式下最好指定为AUTO,在程序启动时会自动为该调度器实例分配instanceId, 默认生成规则:当前主机的hostname+当前系统时间 -->
                   <prop key="org.quartz.scheduler.instanceId">AUTO</prop>  
                   <!-- 获取在未来多少秒内要被触发的trigger -->
                   <prop key="org.quartz.scheduler.idleWaitTime">30000</prop>  
                   <!-- 每次最大获取的trigger数量 -->
                   <prop key="org.quartz.scheduler.batchTriggerAcquisitionMaxCount">1</prop>  
                   <!-- 线程池配置 -->  
                   <prop key="org.quartz.threadPool.class">org.quartz.simpl.SimpleThreadPool</prop>  
                   <prop key="org.quartz.threadPool.threadCount">6</prop>  
                   <prop key="org.quartz.threadPool.threadPriority">5</prop>  
                   <!-- 集群配置 -->  
                   <prop key="org.quartz.jobStore.isClustered">true</prop>  
                   <!-- 每个调度器实例每隔指定20秒签到一次,如果未准时签到,就被认为失败,失败实例之前正在执行的任务会被其他实例接管 -->
                   <prop key="org.quartz.jobStore.clusterCheckinInterval">20000</prop>  
                   <prop key="org.quartz.jobStore.maxMisfiresToHandleAtATime">1</prop>  
                   <prop key="org.quartz.jobStore.misfireThreshold">120000</prop>  
                   <prop key="org.quartz.jobStore.tablePrefix">TEST_</prop>  
               </props>  
           </property>  
           <!-- 这个字段用于标识不同的调度器,每个调度程序的名称要保证不一样,否则可能会出现trigger,job相互覆盖的情况 -->
           <property name="schedulerName" value="testClusterScheduler13" />  
           <!-- 进程启动15秒后调度器才开始工作 -->
           <property name="startupDelay" value="15" />  
           <property name="applicationContextSchedulerContextKey" value="applicationContext" />  
           <!-- 使用此实现可以为job实现类自动注入依赖 -->
           <property name="jobFactory">  
               <bean class="com.travelzen.framework.quartz.AutowireSpringBeanJobFactory" />  
           </property>  
           <property name="overwriteExistingJobs" value="true" />  
           <property name="autoStartup" value="true" />  
           <property name="triggers">  
               <list>  
                   <ref bean="testTrigger" />  
               </list>  
           </property>  
       </bean>  
       <!-- 触发器定义 -->
       <bean id="testTrigger"  class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">  
           <property name="cronExpression" value="0/2 * * * * ?" />  
           <property name="jobDetail" ref="testJobDetail" />  
       </bean>  
        <!-- 任务定义 -->
       <bean id="testJobDetail" class="org.springframework.scheduling.quartz.JobDetailFactoryBean">  
           <property name="jobClass" value="com.cana.quartz.job.TestJob" />  
           <property name="durability" value="true" />  
           <!-- 设置为true时,失败任务会被其他实例接管重新执行 -->
           <property name="requestsRecovery" value="true" />  
       </bean>        
        <!-- quartz数据库定义 -->
       <bean id="quartzDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
           <property name="driverClassName" value="com.mysql.jdbc.Driver" />        
           <property name="url" value="jdbc:mysql://192.168.1.7:3306/quartz" /> 
           <property name="username" value="root" />         
           <property name="password" value="xxxxx" />         
           <property name="maxActive" value="10"></property>
           <property name="minIdle" value="3"></property>  
           <property name="maxIdle" value="5"></property>  
       </bean>    
    
    </beans>
    
  5. 定时任务定义

    package com.cana.quartz.job;
    
    import org.quartz.DisallowConcurrentExecution;
    import org.quartz.JobExecutionContext;
    import org.quartz.JobExecutionException;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.quartz.QuartzJobBean;
    
    import com.cana.quartz.service.IEchoService;
    import com.travelzen.framework.core.time.DateTimeUtil;
    
    // 如果实例不允许并发执行,一定要加这个标签
    @DisallowConcurrentExecution
    public class TestJob extends QuartzJobBean{  
    
       @Autowired
       private IEchoService echoService;
    
       public void executeInternal(JobExecutionContext context) throws JobExecutionException {  
           System.out.println(DateTimeUtil.datetime14() + ":" + context.getTrigger().getKey().getName() + "-begin");
           echoService.echo();
           try {
               Thread.sleep(10 * 1000);
           } catch (InterruptedException e) {
               // TODO Auto-generated catch block
               e.printStackTrace();
           }
           System.out.println(DateTimeUtil.datetime14() + ":" + context.getTrigger().getKey().getName() + "-end");
       }  
    
    }
    


    任务如果不能并发运行,一定要在类的上面加@DisallowConcurrentExecution

  6. 定制的JobFactory,实现了Spring依赖注入

    
    package com.travelzen.framework.quartz;
    
    import org.quartz.spi.TriggerFiredBundle;
    import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    import org.springframework.scheduling.quartz.SpringBeanJobFactory;
    
    public class AutowireSpringBeanJobFactory extends SpringBeanJobFactory implements ApplicationContextAware {  
    
       private transient AutowireCapableBeanFactory beanFactory;  
    
       public void setApplicationContext(final ApplicationContext context) {  
           beanFactory = context.getAutowireCapableBeanFactory();  
       }  
    
       @Override  
       protected Object createJobInstance(final TriggerFiredBundle bundle)  
               throws Exception {  
           final Object job = super.createJobInstance(bundle);  
           beanFactory.autowireBean(job);  
           return job;  
       }  
    }
    

    使用此JobFactory可以在通过@Autowired的方式为Job类中的字段自动注入值。

  7. 如何删除不再使用的trigger

    所有定义过的trigger都会在调度器实例启动时保存在数据库中,如果后来某个trigger不再使用了,光从spring xml中删除此trigger的定义并不会自动从数据库中删除对应的记录,调度器实例还是会去触发此trigger, 解决此问题的方法如下:

在spring配置文件中更改schedulerName的值, 在原来的名称后面加一个新的数字后缀,这样原来的trigger即使存在于数据库中也没有调度器实例去调度了。

   <property name="schedulerName" value="testClusterScheduler13" />

results matching ""

    No results matching ""