轉(zhuǎn)自集群調(diào)度機制調(diào)研及源碼分析

quartz2.2.1集群調(diào)度機制調(diào)研及源碼分析
引言
quartz集群架構(gòu)
調(diào)度器實例化
調(diào)度過程
觸發(fā)器的獲取
觸發(fā)trigger:
Job執(zhí)行過程:
總結(jié):
附:

 

引言

quratz是目前最為成熟,使用最廣泛的java任務(wù)調(diào)度框架,功能強大配置靈活.在企業(yè)應(yīng)用中占重要地位.quratz在集群環(huán)境中的使用方式是每個企業(yè)級系統(tǒng)都要考慮的問題.早在2006年,在ITeye上就有一篇關(guān)于quratz集群方案的討論:http://www.iteye.com/topic/40970 ITeye創(chuàng)始人@Robbin在8樓給出了自己對quartz集群應(yīng)用方案的意見.

后來有人總結(jié)了三種quratz集群方案:http://www.iteye.com/topic/114965

1.單獨啟動一個Job Server來跑job,不部署在web容器中.其他web節(jié)點當(dāng)需要啟動異步任務(wù)的時候,可以通過種種方式(DB, JMS, Web Service, etc)通知Job Server,而Job Server收到這個通知之后,把異步任務(wù)加載到自己的任務(wù)隊列中去。

2.獨立出一個job server,這個server上跑一個spring+quartz的應(yīng)用,這個應(yīng)用專門用來啟動任務(wù)。在jobserver上加上hessain,得到業(yè)務(wù)接口,這樣jobserver就可以調(diào)用web container中的業(yè)務(wù)操作,也就是正真執(zhí)行任務(wù)的還是在cluster中的tomcat。在jobserver啟動定時任務(wù)之后,輪流調(diào)用各地址上的業(yè)務(wù)操作(類似apache分發(fā)tomcat一樣),這樣可以讓不同的定時任務(wù)在不同的節(jié)點上運行,減低了一臺某個node的壓力

3.quartz本身事實上也是支持集群的。在這種方案下,cluster上的每一個node都在跑quartz,然后也是通過數(shù)據(jù)中記錄的狀態(tài)來判斷這個操作是否正在執(zhí)行,這就要求cluster上所有的node的時間應(yīng)該是一樣的。而且每一個node都跑應(yīng)用就意味著每一個node都需要有自己的線程池來跑quartz.

總的來說,第一種方法,在單獨的server上執(zhí)行任務(wù),對任務(wù)的適用范圍有很大的限制,要訪問在web環(huán)境中的各種資源非常麻煩.但是集中式的管理容易從架構(gòu)上規(guī)避了分布式環(huán)境的種種同步問題.第二種方法在在第一種方法的基礎(chǔ)上減輕了jobserver的重量,只發(fā)送調(diào)用請求,不直接執(zhí)行任務(wù),這樣解決了獨立server無法訪問web環(huán)境的問題,而且可以做到節(jié)點的輪詢.可以有效地均衡負(fù)載.第三種方案是quartz自身支持的集群方案,在架構(gòu)上完全是分布式的,沒有集中的管理,quratz通過數(shù)據(jù)庫鎖以及標(biāo)識字段保證多個節(jié)點對任務(wù)不重復(fù)獲取,并且有負(fù)載平衡機制和容錯機制,用少量的冗余,換取了高可用性(high avilable HA)和高可靠性.(個人認(rèn)為和git的機制有異曲同工之處,分布式的冗余設(shè)計,換取可靠性和速度).

本文旨在研究quratz為解決分布式任務(wù)調(diào)度中存在的防止重復(fù)執(zhí)行和負(fù)載均衡等問題而建立的機制.以調(diào)度流程作為順序,配合源碼理解其中原理.

quratz的配置,及具體應(yīng)用請參考CRM項目組的另一篇文章:CRM使用Quartz集群總結(jié)分享

quartz集群架構(gòu)

quartz的分布式架構(gòu)如上圖,可以看到數(shù)據(jù)庫是各節(jié)點上調(diào)度器的樞紐.各個節(jié)點并不感知其他節(jié)點的存在,只是通過數(shù)據(jù)庫來進(jìn)行間接的溝通.

實際上,quartz的分布式策略就是一種以數(shù)據(jù)庫作為邊界資源的并發(fā)策略.每個節(jié)點都遵守相同的操作規(guī)范,使得對數(shù)據(jù)庫的操作可以串行執(zhí)行.而不同名稱的調(diào)度器又可以互不影響的并行運行.

組件間的通訊圖如下:(*注:主要的sql語句附在文章最后)

quartz運行時由QuartzSchedulerThread類作為主體,循環(huán)執(zhí)行調(diào)度流程。JobStore作為中間層,按照quartz的并發(fā)策略執(zhí)行數(shù)據(jù)庫操作,完成主要的調(diào)度邏輯。JobRunShellFactory負(fù)責(zé)實例化JobDetail對象,將其放入線程池運行。LockHandler負(fù)責(zé)獲取LOCKS表中的數(shù)據(jù)庫鎖。

整個quartz對任務(wù)調(diào)度的時序大致如下:

梳理一下其中的流程,可以表示為:

0.調(diào)度器線程run()

1.獲取待觸發(fā)trigger

    1.1數(shù)據(jù)庫LOCKS表TRIGGER_ACCESS行加鎖

    1.2讀取JobDetail信息

    1.3讀取trigger表中觸發(fā)器信息并標(biāo)記為"已獲取"

    1.4commit事務(wù),釋放鎖

2.觸發(fā)trigger

    2.1數(shù)據(jù)庫LOCKS表STATE_ACCESS行加鎖

    2.2確認(rèn)trigger的狀態(tài)

    2.3讀取trigger的JobDetail信息

    2.4讀取trigger的Calendar信息

    2.3更新trigger信息

    2.3commit事務(wù),釋放鎖

3實例化并執(zhí)行Job

    3.1從線程池獲取線程執(zhí)行JobRunShell的run方法

可以看到,這個過程中有兩個相似的過程:同樣是對數(shù)據(jù)表的更新操作,同樣是在執(zhí)行操作前獲取鎖 操作完成后釋放鎖.這一規(guī)則可以看做是quartz解決集群問題的核心思想.

規(guī)則流程圖:

進(jìn)一步解釋這條規(guī)則就是:一個調(diào)度器實例在執(zhí)行涉及到分布式問題的數(shù)據(jù)庫操作前,首先要獲取QUARTZ2_LOCKS表中對應(yīng)當(dāng)前調(diào)度器的行級鎖,獲取鎖后即可執(zhí)行其他表中的數(shù)據(jù)庫操作,隨著操作事務(wù)的提交,行級鎖被釋放,供其他調(diào)度器實例獲取.

集群中的每一個調(diào)度器實例都遵循這樣一種嚴(yán)格的操作規(guī)程,那么對于同一類調(diào)度器來說,每個實例對數(shù)據(jù)庫的操作只能是串行的.而不同名的調(diào)度器之間卻可以并行執(zhí)行.

下面我們深入源碼,從微觀上觀察quartz集群調(diào)度的細(xì)節(jié)

調(diào)度器實例化

一個最簡單的quartz helloworld應(yīng)用如下:

復(fù)制代碼
復(fù)制代碼
public class HelloWorldMain {     Log log = LogFactory.getLog(HelloWorldMain.class);           public void run() {         try {             //取得Schedule對象             SchedulerFactory sf = new StdSchedulerFactory();             Scheduler sch = sf.getScheduler();                            JobDetail jd = new JobDetail("HelloWorldJobDetail",Scheduler.DEFAULT_GROUP,HelloWorldJob.class);             Trigger tg = TriggerUtils.makeMinutelyTrigger(1);             tg.setName("HelloWorldTrigger");                           sch.scheduleJob(jd, tg);             sch.start();         } catch ( Exception e ) {             e.printStackTrace();                       }     }     public static void main(String[] args) {         HelloWorldMain hw = new HelloWorldMain();         hw.run();     } }
復(fù)制代碼
復(fù)制代碼

我們看到初始化一個調(diào)度器需要用工廠類獲取實例:

SchedulerFactory sf = new StdSchedulerFactory();
Scheduler sch = sf.getScheduler(); 

然后啟動:

sch.start();
下面跟進(jìn)StdSchedulerFactory的getScheduler()方法:
復(fù)制代碼
復(fù)制代碼
public Scheduler getScheduler() throws SchedulerException {         if (cfg == null) {             initialize();         }         SchedulerRepository schedRep = SchedulerRepository.getInstance();         //從"調(diào)度器倉庫"中根據(jù)properties的SchedulerName配置獲取一個調(diào)度器實例         Scheduler sched = schedRep.lookup(getSchedulerName());         if (sched != null) {             if (sched.isShutdown()) {                 schedRep.remove(getSchedulerName());             } else {                 return sched;             }         }         //初始化調(diào)度器         sched = instantiate();         return sched;     }
復(fù)制代碼
復(fù)制代碼

跟進(jìn)初始化調(diào)度器方法sched = instantiate();發(fā)現(xiàn)是一個700多行的初始化方法,涉及到

    • 讀取配置資源,
    • 生成QuartzScheduler對象,
    • 創(chuàng)建該對象的運行線程,并啟動線程;
    • 初始化JobStore,QuartzScheduler,DBConnectionManager等重要組件,
      至此,調(diào)度器的初始化工作已完成,初始化工作中quratz讀取了數(shù)據(jù)庫中存放的對應(yīng)當(dāng)前調(diào)度器的鎖信息,對應(yīng)CRM中的表QRTZ2_LOCKS,中的STATE_ACCESS,TRIGGER_ACCESS兩個LOCK_NAME.
復(fù)制代碼
復(fù)制代碼
public void initialize(ClassLoadHelper loadHelper,             SchedulerSignaler signaler) throws SchedulerConfigException {         if (dsName == null) {             throw new SchedulerConfigException("DataSource name not set.");         }         classLoadHelper = loadHelper;         if(isThreadsInheritInitializersClassLoadContext()) {             log.info("JDBCJobStore threads will inherit ContextClassLoader of thread: " + Thread.currentThread().getName());             initializersLoader = Thread.currentThread().getContextClassLoader();         }                   this.schedSignaler = signaler;         // If the user hasn't specified an explicit lock handler, then         // choose one based on CMT/Clustered/UseDBLocks.         if (getLockHandler() == null) {                           // If the user hasn't specified an explicit lock handler,             // then we *must* use DB locks with clustering             if (isClustered()) {                 setUseDBLocks(true);             }                           if (getUseDBLocks()) {                 if(getDriverDelegateClass() != null && getDriverDelegateClass().equals(MSSQLDelegate.class.getName())) {                     if(getSelectWithLockSQL() == null) {                         //讀取數(shù)據(jù)庫LOCKS表中對應(yīng)當(dāng)前調(diào)度器的鎖信息                         String msSqlDflt = "SELECT * FROM {0}LOCKS WITH (UPDLOCK,ROWLOCK) WHERE " + COL_SCHEDULER_NAME + " = {1} AND LOCK_NAME = ?";                         getLog().info("Detected usage of MSSQLDelegate class - defaulting 'selectWithLockSQL' to '" + msSqlDflt + "'.");                         setSelectWithLockSQL(msSqlDflt);                     }                 }                 getLog().info("Using db table-based data access locking (synchronization).");                 setLockHandler(new StdRowLockSemaphore(getTablePrefix(), getInstanceName(), getSelectWithLockSQL()));             } else {                 getLog().info(                     "Using thread monitor-based data access locking (synchronization).");                 setLockHandler(new SimpleSemaphore());             }         }     }
復(fù)制代碼
復(fù)制代碼

當(dāng)調(diào)用sch.start();方法時,scheduler做了如下工作:

1.通知listener開始啟動

2.啟動調(diào)度器線程

3.啟動plugin

4.通知listener啟動完成

復(fù)制代碼
復(fù)制代碼
public void start() throws SchedulerException {         if (shuttingDown|| closed) {             throw new SchedulerException(                     "The Scheduler cannot be restarted after shutdown() has been called.");         }         // QTZ-212 : calling new schedulerStarting() method on the listeners         // right after entering start()         //通知該調(diào)度器的listener啟動開始         notifySchedulerListenersStarting();         if (initialStart == null) {             initialStart = new Date();             //啟動調(diào)度器的線程             this.resources.getJobStore().schedulerStarted();                         //啟動plugins             startPlugins();         } else {             resources.getJobStore().schedulerResumed();         }         schedThread.togglePause(false);         getLog().info(                 "Scheduler " + resources.getUniqueIdentifier() + " started.");         //通知該調(diào)度器的listener啟動完成         notifySchedulerListenersStarted();     }
復(fù)制代碼
復(fù)制代碼

調(diào)度過程

調(diào)度器啟動后,調(diào)度器的線程就處于運行狀態(tài)了,開始執(zhí)行quartz的主要工作–調(diào)度任務(wù).

前面已介紹過,任務(wù)的調(diào)度過程大致分為三步:

1.獲取待觸發(fā)trigger

2.觸發(fā)trigger

3.實例化并執(zhí)行Job

下面分別分析三個階段的源碼.

QuartzSchedulerThread是調(diào)度器線程類,調(diào)度過程的三個步驟就承載在run()方法中,分析見代碼注釋:

按 Ctrl+C 復(fù)制代碼
按 Ctrl+C 復(fù)制代碼

調(diào)度器每次獲取到的trigger是30s內(nèi)需要執(zhí)行的,所以要等待一段時間至trigger執(zhí)行前2ms.在等待過程中涉及到一個新加進(jìn)來更緊急的trigger的處理邏輯.分析寫在注釋中,不再贅述.

可以看到調(diào)度器的只要在運行狀態(tài),就會不停地執(zhí)行調(diào)度流程.值得注意的是,在流程的最后線程會等待一個隨機的時間.這就是quartz自帶的負(fù)載平衡機制.

以下是三個步驟的跟進(jìn):

觸發(fā)器的獲取

調(diào)度器調(diào)用:

triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());

在數(shù)據(jù)庫中查找一定時間范圍內(nèi)將會被觸發(fā)的trigger.參數(shù)的意義如下:參數(shù)1:nolaterthan = now+3000ms,即未來30s內(nèi)將會被觸發(fā).參數(shù)2 最大獲取數(shù)量,大小取線程池線程剩余量與定義值得較小者.參數(shù)3 時間窗口 默認(rèn)為0,程序會在nolaterthan后加上窗口大小來選擇trigger.quratz會在每次觸發(fā)trigger后計算出trigger下次要執(zhí)行的時間,并在數(shù)據(jù)庫QRTZ2_TRIGGERS中的NEXT_FIRE_TIME字段中記錄.查找時將當(dāng)前毫秒數(shù)與該字段比較,就能找出下一段時間內(nèi)將會觸發(fā)的觸發(fā)器.查找時,調(diào)用在JobStoreSupport類中的方法:

復(fù)制代碼
復(fù)制代碼
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)         throws JobPersistenceException {                   String lockName;         if(isAcquireTriggersWithinLock() || maxCount > 1) {             lockName = LOCK_TRIGGER_ACCESS;         } else {             lockName = null;         }         return executeInNonManagedTXLock(lockName,                 new TransactionCallback<List<OperableTrigger>>() {                     public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {                         return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);                     }                 },                 new TransactionValidator<List<OperableTrigger>>() {                     public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException {                         //...異常處理回調(diào)方法                     }                 });     }
復(fù)制代碼
復(fù)制代碼

該方法關(guān)鍵的一點在于執(zhí)行了executeInNonManagedTXLock()方法,這一方法指定了一個鎖名,兩個回調(diào)函數(shù).在開始執(zhí)行時獲得鎖,在方法執(zhí)行完畢后隨著事務(wù)的提交鎖被釋放.在該方法的底層,使用 for update語句,在數(shù)據(jù)庫中加入行級鎖,保證了在該方法執(zhí)行過程中,其他的調(diào)度器對trigger進(jìn)行獲取時將會等待該調(diào)度器釋放該鎖.此方法是前面介紹的quartz集群策略的的具體實現(xiàn),這一模板方法在后面的trigger觸發(fā)過程還會被使用.

public static final String SELECT_FOR_LOCK = "SELECT * FROM "
            + TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
            " AND " + COL_LOCK_NAME + " = ? FOR UPDATE";

進(jìn)一步解釋:quratz在獲取數(shù)據(jù)庫資源之前,先要以for update方式訪問LOCKS表中相應(yīng)LOCK_NAME數(shù)據(jù)將改行鎖定.如果在此前該行已經(jīng)被鎖定,那么等待,如果沒有被鎖定,那么讀取滿足要求的trigger,并把它們的status置為STATE_ACQUIRED,如果有tirgger已被置為STATE_ACQUIRED,那么說明該trigger已被別的調(diào)度器實例認(rèn)領(lǐng),無需再次認(rèn)領(lǐng),調(diào)度器會忽略此trigger.調(diào)度器實例之間的間接通信就體現(xiàn)在這里.

JobStoreSupport.acquireNextTrigger()方法中:

int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);

最后釋放鎖,這時如果下一個調(diào)度器在排隊獲取trigger的話,則仍會執(zhí)行相同的步驟.這種機制保證了trigger不會被重復(fù)獲取.按照這種算法正常運行狀態(tài)下調(diào)度器每次讀取的trigger中會有相當(dāng)一部分已被標(biāo)記為被獲取.

獲取trigger的過程進(jìn)行完畢.

觸發(fā)trigger:

QuartzSchedulerThread line336:

List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);

調(diào)用JobStoreSupport類的triggersFired()方法:

復(fù)制代碼
復(fù)制代碼
public List<TriggerFiredResult> triggersFired(final List<OperableTrigger> triggers) throws JobPersistenceException {         return executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,                 new TransactionCallback<List<TriggerFiredResult>>() {                     public List<TriggerFiredResult> execute(Connection conn) throws JobPersistenceException {                         List<TriggerFiredResult> results = new ArrayList<TriggerFiredResult>();                         TriggerFiredResult result;                         for (OperableTrigger trigger : triggers) {                             try {                               TriggerFiredBundle bundle = triggerFired(conn, trigger);                               result = new TriggerFiredResult(bundle);                             } catch (JobPersistenceException jpe) {                                 result = new TriggerFiredResult(jpe);                             } catch(RuntimeException re) {                                 result = new TriggerFiredResult(re);                             }                             results.add(result);                         }                         return results;                     }                 },                 new TransactionValidator<List<TriggerFiredResult>>() {                     @Override                     public Boolean validate(Connection conn, List<TriggerFiredResult> result) throws JobPersistenceException {                         //...異常處理回調(diào)方法                     }                 });     }
復(fù)制代碼
復(fù)制代碼

此處再次用到了quratz的行為規(guī)范:executeInNonManagedTXLock()方法,在獲取鎖的情況下對trigger進(jìn)行觸發(fā)操作.其中的觸發(fā)細(xì)節(jié)如下:

復(fù)制代碼
復(fù)制代碼
protected TriggerFiredBundle triggerFired(Connection conn,             OperableTrigger trigger)         throws JobPersistenceException {         JobDetail job;         Calendar cal = null;         // Make sure trigger wasn't deleted, paused, or completed...         try { // if trigger was deleted, state will be STATE_DELETED             String state = getDelegate().selectTriggerState(conn,                     trigger.getKey());             if (!state.equals(STATE_ACQUIRED)) {                 return null;             }         } catch (SQLException e) {             throw new JobPersistenceException("Couldn't select trigger state: "                     + e.getMessage(), e);         }         try {             job = retrieveJob(conn, trigger.getJobKey());             if (job == null) { return null; }         } catch (JobPersistenceException jpe) {             try {                 getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);                 getDelegate().updateTriggerState(conn, trigger.getKey(),                         STATE_ERROR);             } catch (SQLException sqle) {                 getLog().error("Unable to set trigger state to ERROR.", sqle);             }             throw jpe;         }         if (trigger.getCalendarName() != null) {             cal = retrieveCalendar(conn, trigger.getCalendarName());             if (cal == null) { return null; }         }         try {             getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job);         } catch (SQLException e) {             throw new JobPersistenceException("Couldn't insert fired trigger: "                     + e.getMessage(), e);         }         Date prevFireTime = trigger.getPreviousFireTime();         // call triggered - to update the trigger's next-fire-time state...         trigger.triggered(cal);         String state = STATE_WAITING;         boolean force = true;                   if (job.isConcurrentExectionDisallowed()) {             state = STATE_BLOCKED;             force = false;             try {                 getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),                         STATE_BLOCKED, STATE_WAITING);                 getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),                         STATE_BLOCKED, STATE_ACQUIRED);                 getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),                         STATE_PAUSED_BLOCKED, STATE_PAUSED);             } catch (SQLException e) {                 throw new JobPersistenceException(                         "Couldn't update states of blocked triggers: "                                 + e.getMessage(), e);             }         }                       if (trigger.getNextFireTime() == null) {             state = STATE_COMPLETE;             force = true;         }         storeTrigger(conn, trigger, job, true, state, force, false);         job.getJobDataMap().clearDirtyFlag();         return new TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup()                 .equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date(), trigger                 .getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());     }
復(fù)制代碼
復(fù)制代碼

該方法做了以下工作:

1.獲取trigger當(dāng)前狀態(tài)

2.通過trigger中的JobKey讀取trigger包含的Job信息

3.將trigger更新至觸發(fā)狀態(tài)

4.結(jié)合calendar的信息觸發(fā)trigger,涉及多次狀態(tài)更新

5.更新數(shù)據(jù)庫中trigger的信息,包括更改狀態(tài)至STATE_COMPLETE,及計算下一次觸發(fā)時間.

6.返回trigger觸發(fā)結(jié)果的數(shù)據(jù)傳輸類TriggerFiredBundle

 

從該方法返回后,trigger的執(zhí)行過程已基本完畢.回到執(zhí)行quratz操作規(guī)范的executeInNonManagedTXLock方法,將數(shù)據(jù)庫鎖釋放.

trigger觸發(fā)操作完成

Job執(zhí)行過程:

再回到線程類QuartzSchedulerThread的 line353這時觸發(fā)器都已出發(fā)完畢,job的詳細(xì)信息都已就位

QuartzSchedulerThread line:368

 

qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
shell.initialize(qs);

 

為每個Job生成一個可運行的RunShell,并放入線程池運行.

在最后調(diào)度線程生成了一個隨機的等待時間,進(jìn)入短暫的等待,這使得其他節(jié)點的調(diào)度器都有機會獲取數(shù)據(jù)庫資源.如此就實現(xiàn)了quratz的負(fù)載平衡.

這樣一次完整的調(diào)度過程就結(jié)束了.調(diào)度器線程進(jìn)入下一次循環(huán).

總結(jié):

簡單地說,quartz的分布式調(diào)度策略是以數(shù)據(jù)庫為邊界資源的一種異步策略.各個調(diào)度器都遵守一個基于數(shù)據(jù)庫鎖的操作規(guī)則保證了操作的唯一性.同時多個節(jié)點的異步運行保證了服務(wù)的可靠.但這種策略有自己的局限性.摘錄官方文檔中對quratz集群特性的說明:

Only one node will fire the job for each firing. What I mean by that is, if the job has a repeating trigger that tells it to fire every 10 seconds, then at 12:00:00 exactly one node will run the job, and at 12:00:10 exactly one node will run the job, etc. It won't necessarily be the same node each time - it will more or less be random which node runs it. The load balancing mechanism is near-random for busy schedulers (lots of triggers) but favors the same node for non-busy (e.g. few triggers) schedulers. 

The clustering feature works best for scaling out long-running and/or cpu-intensive jobs (distributing the work-load over multiple nodes). If you need to scale out to support thousands of short-running (e.g 1 second) jobs, consider partitioning the set of jobs by using multiple distinct schedulers (including multiple clustered schedulers for HA). The scheduler makes use of a cluster-wide lock, a pattern that degrades performance as you add more nodes (when going beyond about three nodes - depending upon your database's capabilities, etc.).

說明指出,集群特性對于高cpu使用率的任務(wù)效果很好,但是對于大量的短任務(wù),各個節(jié)點都會搶占數(shù)據(jù)庫鎖,這樣就出現(xiàn)大量的線程等待資源.這種情況隨著節(jié)點的增加會越來越嚴(yán)重.

附:

通訊圖中關(guān)鍵步驟的主要sql語句:

復(fù)制代碼
復(fù)制代碼
3. select TRIGGER_ACCESS from QRTZ2_LOCKS for update 4. SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM QRTZ2_TRIGGERS WHERE SCHEDULER_NAME = 'CRMscheduler' AND TRIGGER_STATE = 'ACQUIRED' AND NEXT_FIRE_TIME <= '{timekey 30s latter}' AND ( MISFIRE_INSTR = -1 OR ( MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= '{timekey now}' ) ) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC; 5. SELECT * FROM QRTZ2_JOB_DETAILS WHERE SCHEDULER_NAME = CRMscheduler AND JOB_NAME = ? AND JOB_GROUP = ?; 6. UPDATE TQRTZ2_TRIGGERS SET TRIGGER_STATE = 'ACQUIRED' WHERE SCHED_NAME = 'CRMscheduler' AND TRIGGER_NAME = '{triggerName}' AND TRIGGER_GROUP = '{triggerGroup}' AND TRIGGER_STATE = 'waiting'; 7. INSERT INTO QRTZ2_FIRED_TRIGGERS (SCHEDULER_NAME, ENTRY_ID, TRIGGER_NAME, TRIGGER_GROUP, INSTANCE_NAME, FIRED_TIME, SCHED_TIME, STATE, JOB_NAME, JOB_GROUP, IS_NONCONCURRENT, REQUESTS_RECOVERY, PRIORITY) VALUES( 'CRMscheduler', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?); 8. commit; 12. select STAT_ACCESS from QRTZ2_LOCKS for update 13. SELECT TRIGGER_STATE FROM QRTZ2_TRIGGERS WHERE SCHEDULER_NAME = 'CRMscheduler' AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?; 14. SELECT TRIGGER_STATE FROM QRTZ2_TRIGGERS WHERE SCHEDULER_NAME = 'CRMscheduler' AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?; 14. SELECT * FROM QRTZ2_JOB_DETAILS WHERE SCHEDULER_NAME = CRMscheduler AND JOB_NAME = ? AND JOB_GROUP = ?; 15. SELECT * FROM QRTZ2_CALENDARS WHERE SCHEDULER_NAME = 'CRMscheduler' AND CALENDAR_NAME = ?; 16. UPDATE QRTZ2_FIRED_TRIGGERS SET INSTANCE_NAME = ?, FIRED_TIME = ?, SCHED_TIME = ?, ENTRY_STATE = ?, JOB_NAME = ?, JOB_GROUP = ?, IS_NONCONCURRENT = ?, REQUESTS_RECOVERY = ? WHERE SCHEDULER_NAME = 'CRMscheduler' AND ENTRY_ID = ?; 17. UPDATE TQRTZ2_TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = 'CRMscheduler' AND TRIGGER_NAME = '{triggerName}' AND TRIGGER_GROUP = '{triggerGroup}' AND TRIGGER_STATE = ?; 18. UPDATE QRTZ2_TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTRUCTION = ?, PRIORITY = ?, JOB_DATAMAP = ? WHERE SCHEDULER_NAME = SCHED_NAME_SUBST AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?; 19. commit;
復(fù)制代碼
復(fù)制代碼

原文地址:http://demo.netfoucs.com/gklifg/article/details/27090179