OFBiz執行后臺任務的類在org.ofbiz.service.job中。

JobPoller和JobInvoker是主要的兩個類,一個負責查詢可以執行的Job,另一個執行Job任務。Job類圖如下所示。



1.Job輪詢

創建JobManager時,會創建JobPoller的一個實例。JobPoller實現了Runnable接口,以此創建線程后
通過JobManager一直輪詢是否有Job需要執行,如果有獎將其放入隊列中。

 1     public synchronized void run() {
 2         try {
 3             // wait 30 seconds before the first poll
 4             java.lang.Thread.sleep(30000);
 5         } catch (InterruptedException e) {
 6         }
 7         while (isRunning) {
 8             try {
 9                 // grab a list of jobs to run.
10                 List<Job> pollList = jm.poll();
11                 //Debug.logInfo("Received poll list from JobManager [" + pollList.size() + "]", module);
12 
13                 for (Job job : pollList) {
14                     if (job.isValid()) {
15                         queueNow(job);
16                         //Debug.logInfo("Job [" + job.getJobId() + "] is queued", module);
17                     }
18                 }
19                 // NOTE: using sleep instead of wait for stricter locking
20                 java.lang.Thread.sleep(pollWaitTime());
21             } catch (InterruptedException e) {
22                 Debug.logError(e, module);
23                 stop();
24             }
25         }
26     }
27 

queueNow方法將要執行job放入到隊列中,如果隊列中的等待執行的job數量很多,那么就創建一定數量的線程執行這些job。

 1    public void queueNow(Job job) {
 
2         synchronized (run) {
 6             run.add(job);
 7         }
 8         if (Debug.verboseOn()) Debug.logVerbose("New run queue size: " + run.size(), module);
 9         if (run.size() > pool.size() && pool.size() < maxThreads()) {
10             synchronized (pool) {
11                 if (run.size() > pool.size() && pool.size() < maxThreads()) {
12                     int calcSize = (run.size() / jobsPerThread()) - (pool.size());
13                     int addSize = calcSize > maxThreads() ? maxThreads() : calcSize;
14 
15                     for (int i = 0; i < addSize; i++) {
16                         JobInvoker iv = new JobInvoker(this, invokerWaitTime());
17                         pool.add(iv);
18                     }
19                 }
20             }
21         }
22     }

JobInvoker就是執行的線程,它從queue中取job并執行。JobInvoker線程不是一直運行下去,運行的時間長度超過一定的值(見serviceengine.xml中ttl的值)線程就會停止并從pool中刪除。JobInvoker的run方法中job.exec()執行具體的任務。


2.Job執行

Job類都有一個exec方法,用戶執行Job的service。如GenericServiceJob中的exec方法如下:

 1     public void exec() throws InvalidJobException {
 2         init();
 3 
 4         // no transaction is necessary since runSync handles this
 5         try {
 6             // get the dispatcher and invoke the service via runSync -- will run all ECAs
 7             LocalDispatcher dispatcher = dctx.getDispatcher();
 8             Map<String, Object> result = dispatcher.runSync(getServiceName(), getContext());
 9 
10             // check for a failure
11             boolean isError = ModelService.RESPOND_ERROR.equals(result.get(ModelService.RESPONSE_MESSAGE));
12             if (isError) {
13                  String errorMessage = (String) result.get(ModelService.ERROR_MESSAGE);
14                  this.failed(new Exception(errorMessage));
15             }
16 
17             if (requester != null) {
18                 requester.receiveResult(result);
19             }
20 
21         } catch (Throwable t) {
22             // pass the exception back to the requester.
23             if (requester != null) {
24                 requester.receiveThrowable(t);
25             }
26 
27             // call the failed method
28             this.failed(t);
29         }
30 
31         // call the finish method
32         this.finish();
33     }
34 

在執行service執行,有一個init方法,在PersistedServiceJob類中init方法主要是生成下一個執行的任務,如果有的話。也即是說每一個job是由當時執行的這個job生成的,根據是什么呢?主要是兩個變量:tempExprId和maxRecurrenceCount,init方法中:
    
 1     TemporalExpression expr = null;
 2     ……
 3 
 4     if (expr == null && UtilValidate.isNotEmpty(job.getString("tempExprId"))) {
 5             try {
 6                 expr = TemporalExpressionWorker.getTemporalExpression(this.delegator, job.getString("tempExprId"));
 7             } catch (GenericEntityException e) {
 8                 throw new RuntimeException(e.getMessage());
 9             }
10         }
11 

TemporalExpressionWorker里面有一個makeTemporalExpression方法很重要,從這個方法可以知道怎么配置TemporalExpression實體數據了,當然要結合TemporalExpressions類,里面定義了各種配置的細節。

tempExprTypeId有如下幾種:

DateRange
DayInMonth
DayOfMonthRange
DayOfWeekRange
Difference
Frequency
Intersection
MonthRange
TimeOfDayRange
Union

比如如果希望服務只執行一次,可以如下配置:
    <TemporalExpression tempExprId="RUNONCE" tempExprTypeId="FREQUENCY" integer1="1" integer2="1"/>
    <JobSandbox jobId="CurrencyRateSynAll" jobName="Currency Rate SynAll" runTime="2010-02-26 09:38:00.000" serviceName="currencyRateSynAll" poolId="pool" runAsUser="system" tempExprId="RUNONCE" maxRecurrenceCount="0"/>

maxRecurrenceCount="0" 表示,不重復。tempExprTypeId="FREQUENCY" integer1="1" integer2="1"表示一年執行一次。所以總共執行一次就結束了。

每天都執行可以這樣配置:

<TemporalExpression tempExprId="MIDNIGHT_DAILY" tempExprTypeId="TIME_OF_DAY_RANGE" string1="20:00:00" string2="20:00:00"/>
    <JobSandbox jobId="MailNotification" jobName="Mail Notification Job" runTime="2010-02-25 18:00:00.000" serviceName="mailNotificantion" poolId="pool" runAsUser="system" tempExprId="MIDNIGHT_DAILY" maxRecurrenceCount="-1"/>

maxRecurrenceCount="-1"表示無限循環下去。tempExprId="MIDNIGHT_DAILY" tempExprTypeId="TIME_OF_DAY_RANGE" string1="20:00:00" string2="20:00:00"/>表示每天晚上八點執行。

每個月一次任務可以如下配置:

<TemporalExpression tempExprId="ONCEINMONTH" tempExprTypeId="FREQUENCY" date1="2010-02-26 11:05:00.000" integer1="2" integer2="1"/>
    <JobSandbox jobId="CurrencyRateSyn" jobName="Currency Rate Syn" runTime="2010-02-26 11:05:00.000" serviceName="currencyRateSyn" poolId="pool" runAsUser="system" tempExprId="ONCEINMONTH" maxRecurrenceCount="-1"/>

tempExprTypeId="FREQUENCY" date1="2010-02-26 11:05:00.000" integer1="2" integer2="1"表示每月一次,時間就是date1定義的時間,如果沒用定義date1,那么就是當前時間。

這里的配置相當靈活,好好掌握。