主要介紹如何周期性盡量實(shí)時(shí)地從RDBMS爬數(shù)據(jù)然后建索引,不涉及AOP或ORM Framework的listener方式。
先決條件:
- Lucene索引是從無到有的,一開始所有數(shù)據(jù)都是存儲(chǔ)在RDBMS(Oracle)中。
- 數(shù)據(jù)表有一列是updateTime或稱為lastModifiedTime用來存儲(chǔ)最后一次更新時(shí)間,并建有db索引
- 主表必須要有主鍵,這個(gè)主鍵也用來唯一確定一個(gè)Lucene document。
該策略大致可以分為以下幾個(gè)部分:
1.索引結(jié)構(gòu)
2.初始化索引
3.增量索引
4.補(bǔ)償操作
5.刪除檢測(cè)
6.備份
7.注意點(diǎn)
索引結(jié)構(gòu):
我們是做全文搜索的,所以db中的用戶需要全文搜索的字段值都會(huì)拼接成一個(gè)string存儲(chǔ)在lucene的一個(gè)字段中稱為content。其他的需要存儲(chǔ)的字段或者字段分析后的token是有意義單詞的我們稱為metadata,每個(gè)metadata是document的一個(gè)field。
從索引結(jié)構(gòu)看來,我們有一個(gè)主表,不同的主表OID的記錄對(duì)應(yīng)不同的lucene document。主表依賴的所有子表中的數(shù)據(jù)如果是需要做全文搜索的就需要append在一起然后用Ngram進(jìn)行分詞處理。有的數(shù)據(jù)是需要存儲(chǔ)的或有特殊用途的,比如訪問控制,就不分詞或者用特殊的analyzer分詞。
簡單的介紹一下,具體會(huì)在以后的文章中談到。
初始化索引
初始化索引,是索引從無到有的一個(gè)過程。也是系統(tǒng)第一次初始化時(shí)做的事情。初始化時(shí)我們會(huì)首先獲取當(dāng)前時(shí)間,然后將updateTime早于當(dāng)前時(shí)間的所有數(shù)據(jù)取出來。最后把當(dāng)前時(shí)間存儲(chǔ)到timeTrace.properties文件中。作為增量索引的起始依據(jù)。
取什么?
用戶預(yù)先在配置文件配置一個(gè)主SQL,通過這個(gè)sql我們?nèi)〕鲂枳鋈乃饕?/span>content和metadata。對(duì)于某些特定的metadata不希望被build到全文索引中的,可以單獨(dú)配置sql。
如何取,多線程分批取。我們會(huì)有一個(gè)線程池,每個(gè)分批都是作為該線程池的一個(gè)task被提交執(zhí)行。
· 查詢出當(dāng)前時(shí)間之前的記錄總數(shù)n
· 按照rownum分批。假設(shè)分批的大小是2000,則需要將rownum為0, 2000,4000,6000…..max(rownum)的所有OID都一次拿回來。
Select oid from x where rownum=0 or rownum=2000,………….or rownum=n
如果直接根據(jù)rownum分批取,會(huì)出現(xiàn)幻影讀的問題,因?yàn)?/span>rownum每次查詢都會(huì)發(fā)生變化,如果有新數(shù)據(jù)插入,改用當(dāng)時(shí)snapshot的OID去取,避免這個(gè)問題,因?yàn)?/span>OID是不變的。有人會(huì)說,這樣可能查到新插入的數(shù)據(jù),當(dāng)然是這樣的,但是查到新加入的不會(huì)有影響,新加入的也是遲早需要爬的,但是用rownum還會(huì)丟失數(shù)據(jù)。
· 根據(jù)拿回來的斷點(diǎn)的OID,分批取。比如令先前獲取的rownum為2000的oid是10231,拿回來的4000的oid是14215
前兩個(gè)分批的sql就是:
Select * from x where oid >= 0 and oid < 10231;
Select * from x where oid >=10231 and oid < 14215;
具體的sql和oid是哪張表的哪列都是在配置文件中預(yù)先配置的。
· 當(dāng)然每個(gè)task還要負(fù)責(zé)將從db中爬出的數(shù)據(jù)建索引,建完索引后提交對(duì)索引的改動(dòng)
· 所有task都跑完之后,主線程將一開始獲取的當(dāng)前時(shí)間更新到timeTrace.properties文件中。
當(dāng)中可能會(huì)出現(xiàn)問題,如果出現(xiàn)問題,就需要重新初始化,初始化之前會(huì)清除所有已建好的部分臟索引。因?yàn)楫?dāng)前時(shí)間沒有更新到timeTrace.properties文件中。我們測(cè)試下來百萬級(jí)的數(shù)據(jù)這個(gè)過程大概需要10分鐘。
增量索引
在初始化索引成功后,當(dāng)時(shí)的時(shí)間已經(jīng)被更新到timeTrace.properties。增量索引是從這個(gè)時(shí)間點(diǎn)開始定期地被觸發(fā)執(zhí)行,可以使用quartz來管理這個(gè)timing job。增量索引稱為incrementalIndexService,增量索引服務(wù)的不同任務(wù)調(diào)度之間需要同步執(zhí)行,用quartz的stateful job可以實(shí)現(xiàn),或者使用內(nèi)存,文件或DB鎖。
· 第一步,從timeTrace文件中獲取已爬數(shù)據(jù)的截至?xí)r間Tlast,獲取當(dāng)前時(shí)間Tcurrent。
· Select count(*) from X where updateTime > Tlast and updateTime <= Tcurrent, 結(jié)果記為n,如果n小于分批的大小就直接爬出這段的索引數(shù)據(jù)。如果n大于分批大小,就需要將這個(gè)n個(gè)結(jié)果分批.
· 這次我們按照時(shí)間段分批,如果n/2000 = 3但有余數(shù), 那就說明要分四批拿,將這個(gè)時(shí)間段Tcurrent-Tlast平均分為四段。每個(gè)線程處理其中的某段。
· 某線程將它負(fù)責(zé)的某段數(shù)據(jù)拿回來之后,首先判斷這個(gè)OID是否在index已經(jīng)存在,如果存在就說明在這個(gè)時(shí)間段里這條記錄是被用戶update過的,index也做相應(yīng)的update。如果這個(gè)OID在index中不存在,則說明這條記錄是新加入到db中的,index也做add操作。做完之后提交。
· 當(dāng)所有分批都完成之后,更新timeTrace文件,把時(shí)間更新為Tcurrent。
· 一旦有分批出現(xiàn)問題失敗,整個(gè)時(shí)間段就認(rèn)為是不成功的,需要重新爬一遍。
一般增量服務(wù)我們?cè)O(shè)置的間隔都小于1分鐘,因?yàn)樾枰贸鲎顚?shí)時(shí)的數(shù)據(jù),而且每次獲取數(shù)據(jù)的結(jié)束時(shí)間都是當(dāng)前時(shí)間。保證數(shù)據(jù)的實(shí)時(shí)性。
補(bǔ)償操作
補(bǔ)償操作在整個(gè)爬蟲策略中是最復(fù)雜的一個(gè)環(huán)節(jié)。采用增量索引看似天衣無縫,其實(shí)還是有風(fēng)險(xiǎn)的。因?yàn)橛涗浀?/span>db的updateTime往往都是有延遲的,一般情況下是java端的時(shí)間或是記錄寫入DB的時(shí)間,都早于commit時(shí)間,但一般數(shù)據(jù)庫的隔離級(jí)別都是read committed。只有在數(shù)據(jù)被提交后才可能被增量服務(wù)看到。這樣的話3點(diǎn)跑的增量服務(wù),先前的結(jié)束時(shí)間是2點(diǎn)58分。這時(shí)它需要獲取2.58到3.00之間的數(shù)據(jù),但是此時(shí)有可能java端正有一條記錄生成,它的updateTime是 2.59,但是它一直沒有commit,因?yàn)?/span>transaction的超時(shí)時(shí)間是10分鐘。悲劇了發(fā)生了,這條數(shù)據(jù)將永遠(yuǎn)不會(huì)被爬出來,除非遙遠(yuǎn)的將來有人再次更新它。因?yàn)檫@個(gè)時(shí)間段已經(jīng)被爬過了,按照增量服務(wù),它是永遠(yuǎn)不會(huì)再爬timeTrace文件中記錄的時(shí)間之前的數(shù)據(jù)的。
此時(shí),補(bǔ)償服務(wù)隆重登場(chǎng)。它存在的價(jià)值就是把所有可能被遺漏的數(shù)據(jù)都查出來。關(guān)鍵點(diǎn)就是要找出在補(bǔ)償服務(wù)運(yùn)行時(shí),哪個(gè)時(shí)間段的數(shù)據(jù)是可能被遺漏的而哪個(gè)時(shí)間段的數(shù)據(jù)又是永遠(yuǎn)不會(huì)被丟失的。那個(gè)永遠(yuǎn)不會(huì)丟失的時(shí)間段就沒必要再去管它。我們關(guān)心的只是可能被遺漏的那段時(shí)間段的數(shù)據(jù)。
我們來看個(gè)例子:
· 增量服務(wù)每1分鐘跑一次, 周期記為P(N) = 1
· Transaction Timeout時(shí)間3分鐘,記為TO
· 補(bǔ)償服務(wù)每x分鐘跑一次,P(C) = x >= P(N)
下圖的第一條時(shí)間軸T(N)是增量服務(wù)的,第二條是補(bǔ)償服務(wù)的T(C)。
對(duì)于補(bǔ)償服務(wù)我們需要確定每次的開始時(shí)間Ts,結(jié)束時(shí)間Te,周期P(C), 算法,初始化值,意外情況,優(yōu)化等方面。
結(jié)束時(shí)間Te
補(bǔ)償服務(wù)的目的是對(duì)增量索引進(jìn)行補(bǔ)償,所以它所補(bǔ)償?shù)臅r(shí)間區(qū)間一定是增量服務(wù)已經(jīng)處理過的。所以它的結(jié)束時(shí)間一定是timeTrace文件中最后一次增量服務(wù)記錄的時(shí)間我們記為Last(N). Last(N)之后的數(shù)據(jù)或者正在被增量服務(wù)處理或者沒有被增量服務(wù)處理,如果補(bǔ)償服務(wù)去涉及這些數(shù)據(jù),那肯定全是要補(bǔ)償?shù)?,但是增量服?wù)也會(huì)去處理,一個(gè)是會(huì)重復(fù)處理,一旦需要補(bǔ)償,我們會(huì)把這條記錄的所有數(shù)據(jù)都從DB端取過來,建索引,重復(fù)的代價(jià)也是很大的。為了避免這個(gè)代價(jià): Te < Last(N).
如果Te = Last(N), 我們就會(huì)拿到最新的需要補(bǔ)償?shù)臄?shù)據(jù),補(bǔ)償服務(wù)的延遲就最小。Te < Last(N), 每次都沒有不嘗到最新的數(shù)據(jù),遺漏數(shù)據(jù)被檢測(cè)到就會(huì)有延遲,只有等下一次補(bǔ)償服務(wù)觸發(fā)時(shí)才能被檢測(cè)出。
開始時(shí)間Ts
上圖中,transaction timeout之前的那個(gè)時(shí)間段,如果有數(shù)據(jù)生成,T(N) = 1, 要么在圖上標(biāo)出的[T(N) =1,T(N) = 4]之間被提交,要么transaction超時(shí)該數(shù)據(jù)也不會(huì)寫入到DB中。所以Last(N) – TO之前的數(shù)據(jù)在這一次補(bǔ)償服務(wù)的時(shí)候已經(jīng)是完全可見的,肯定都會(huì)被補(bǔ)償?shù)?。?duì)于下一次來說這塊也是不需要再被補(bǔ)償?shù)摹M耆梢妳^(qū)(針對(duì)下一次補(bǔ)償操作)必須滿足下面兩個(gè)條件:
· 被增量服務(wù)處理過并且更新已經(jīng)完全對(duì)本次補(bǔ)償服務(wù)可見
· 已經(jīng)被補(bǔ)償服務(wù)處理過
則下一次補(bǔ)償操作就不會(huì)再關(guān)心Last(N)-TO之前的數(shù)據(jù)了。我們把上一次補(bǔ)償服務(wù)記為Last(C), 而此次的增量服務(wù)記為Last(C Last(N)). 則下一次補(bǔ)償服務(wù)的開始時(shí)間<=Last(N)-TO.因?yàn)榇笥谶@個(gè)時(shí)間的所有數(shù)據(jù)都是需要被補(bǔ)償?shù)?。換個(gè)表達(dá)方式,此次的補(bǔ)償服務(wù)的開始時(shí)間是由上一次補(bǔ)償服務(wù)計(jì)算的得到的,為Last(C Last(N)) – TO. Ts <= Last(C Last(N)) – TO
同時(shí)我們需要注意的是,Last(N) – TO 到Last(N)每次補(bǔ)償服務(wù)是必須要檢測(cè)的,不然就會(huì)有遺漏,因?yàn)槲覀兗僭O(shè)了不可見區(qū),前提條件就是每次都會(huì)檢測(cè)這個(gè)區(qū)域。所以結(jié)束時(shí)間: Last(N) – TO < Te < Last(N).
開始時(shí)間是由上一次補(bǔ)償服務(wù)計(jì)算得到的,那這個(gè)值就需要保存下來。保存在文件中可以避免系統(tǒng)down掉后丟失。我們也會(huì)將這個(gè)時(shí)間值保存到timeTrace.properties的compensation屬性上。
算法
補(bǔ)償服務(wù)的算法主要目的就是比較出遺漏的數(shù)據(jù)。為了比較有無遺漏,我們需要把db中的數(shù)據(jù)和增量服務(wù)已經(jīng)爬過的數(shù)據(jù)進(jìn)行比較才知道。我們會(huì)在內(nèi)存中存放增量服務(wù)已經(jīng)爬過的數(shù)據(jù)的oid和updateTime,在內(nèi)存中存放是為了提高性能。每次補(bǔ)償服務(wù)運(yùn)行時(shí),也會(huì)把完全可見區(qū)從內(nèi)存中清除。
· 增量服務(wù)每次執(zhí)行后就會(huì)將爬出的數(shù)據(jù)的OID和updateTime保存在內(nèi)存中,內(nèi)存中有一棵二叉排序樹維護(hù)OID和updateTime的pair。排序的key是OID。二叉排序樹可以用TreeSet實(shí)現(xiàn)。
· 補(bǔ)償服務(wù)運(yùn)行時(shí),先創(chuàng)建兩個(gè)List一個(gè)用來存放需要update數(shù)據(jù)的oid,記為updateList,另一個(gè)用來存放add數(shù)據(jù)的oid,記為addList
· 接著從DB中取出Ts到Te之間所有數(shù)據(jù)的OID和updateTime,也是根據(jù)OID排序的。
· 計(jì)算下一次補(bǔ)償服務(wù)的開始時(shí)間,Next(Ts) = Last(N) –TO;
· 現(xiàn)在就是要比較兩個(gè)有序集合。樹的訪問者應(yīng)該寫成:
if(OID(C) == OID(N))
if(updateTime(C) > updateTime (N))
{
更新OID(N)的updateTime;
updateList.add(OID(N));
}
else{
addList.add(OID(N))
}
//清除完全可見區(qū),下一次補(bǔ)償服務(wù)開始時(shí)間之前的數(shù)據(jù)
if(updateTime(N) < Next(Ts)) {
tressSetIterator.remove();
}
· 將updateList中OID對(duì)應(yīng)的所有數(shù)據(jù)從db中獲取并update到索引中
· 將addList中OID對(duì)應(yīng)的所有的數(shù)據(jù)從db中獲取并add到索引中
· Commit索引,并將Next(Ts)記錄到updateTrace文件中
意外情況
補(bǔ)償服務(wù)很久沒有被調(diào)度
一般不會(huì)出現(xiàn),因?yàn)槲覀儠?huì)將增量服務(wù)和補(bǔ)償服務(wù)的線程優(yōu)先級(jí)設(shè)為相同的。應(yīng)該會(huì)被分時(shí)處理。如果很久沒有不會(huì)被調(diào)度,正確性是可以保證的,因?yàn)殚_始時(shí)間都是記錄在文件中的,如果一直沒有跑,只是一下子補(bǔ)償?shù)臅r(shí)間段很長,并不會(huì)丟失補(bǔ)償?shù)臅r(shí)間段。但是不排除內(nèi)存溢出的風(fēng)險(xiǎn),因?yàn)榇鎯?chǔ)在內(nèi)存中的treeset在這種情況下會(huì)很大。在treeset很大時(shí),我們可以檢測(cè),如果超過一定的節(jié)點(diǎn)數(shù),就可以將treeset序列化到一個(gè)internal索引中,下次取出來時(shí)也是有序的。甚至可以分塊取出比較。
Server突然shutdown
Server shutdown突然shutdown,線程被interrupt掉,沒有執(zhí)行完,內(nèi)存中的樹也沒了。這時(shí)就需要每次啟動(dòng)時(shí),這個(gè)時(shí)間段內(nèi)的所有數(shù)據(jù)都會(huì)認(rèn)為是需要add到索引的,這樣就會(huì)出問題。所以需要提前檢測(cè),每次系統(tǒng)啟動(dòng)時(shí),補(bǔ)償服務(wù)需要把這段時(shí)間內(nèi)的所有記錄的OID和updateTime從db中獲取,直接和索引中的進(jìn)行比較,比較效率要低一些。但也不會(huì)出現(xiàn)數(shù)據(jù)丟失的情況。
初始化值
系統(tǒng)初始化時(shí),補(bǔ)償服務(wù)初始化Ts(被掃描數(shù)據(jù)的起始時(shí)間)是,初始化索引的當(dāng)前時(shí)間-TO。 而補(bǔ)償服務(wù)本身的開始時(shí)間是在增量服務(wù)開始之后。之后多少可以調(diào)。
優(yōu)化
優(yōu)化的重點(diǎn)放在了以下幾個(gè)方向。
· 對(duì)DB壓力
· 補(bǔ)償延遲
· 消耗內(nèi)存的大小
· 比較次數(shù)
以上選項(xiàng)之間有的都是矛盾的,比如說補(bǔ)償延遲要小,則補(bǔ)償服務(wù)的P(C)就要小,則查詢DB的次數(shù)就增加,對(duì)DB壓力就增大。
所以針對(duì)不同的使用情況,比如DB資源,延遲的可接受程度,應(yīng)用服務(wù)器資源等,我們可能需要采用不同的策略,這就要我們的補(bǔ)償策略可調(diào)。
為了可調(diào),我們不僅使一些參數(shù)可以配置,而且引入了分級(jí)補(bǔ)償服務(wù)的方案。在分級(jí)方案中,如果分n級(jí),則n-1級(jí)的TO輸入值推薦和P(C)是相同的,但也是可調(diào)的。
舉個(gè)例子:一個(gè)三級(jí)補(bǔ)償服務(wù),
第一級(jí):為了使補(bǔ)償?shù)难舆t最小,極端情況下我們可以采用和增量服務(wù)相同的周期假設(shè)為1分鐘,此時(shí)TO的輸入值也是周期值。此級(jí)的啟動(dòng)時(shí)間也是初始化當(dāng)前時(shí)間記為Tinitial+P(N).
第二級(jí): 業(yè)務(wù)場(chǎng)景中絕大多數(shù)事務(wù)都是在3分鐘內(nèi)完成的,如果TO是3分鐘,基本上絕大多數(shù)事務(wù)都可以及時(shí)的補(bǔ)償?shù)?。此?jí)的啟動(dòng)時(shí)間是Tinitial+3
第三級(jí):也是最后一級(jí),在App Server中配置的真正的TO是10分鐘,為了保證正確性,TO的輸入值一定要是10分鐘,因?yàn)橹恍枰WC正確性所以它的頻率也不需要太頻繁周期也設(shè)為10分鐘。從前文中可知Last(N) – TO < Te < Last(N). 此級(jí)沒有必要多實(shí)時(shí),所以Te就取最小值=Last(N)-TO.
我們將這個(gè)三級(jí)策略和一級(jí)策略進(jìn)行比較,我們假設(shè)一級(jí)策略的周期為2分鐘。假設(shè)整個(gè)時(shí)間段是10分鐘。
比較項(xiàng)
|
一級(jí)(2)
|
三級(jí)(1, 3, 10)
|
DB訪問次數(shù)
|
5
|
14
|
延遲
|
2
|
<2
|
內(nèi)存
|
11分鐘數(shù)據(jù)的OID 和updateTime
|
17
|
訪問DB的數(shù)據(jù)量
|
12×5=60
|
10+6×10/3+2×10=50
|
比較次數(shù)
|
60
|
50
|
在這個(gè)分級(jí)策略中級(jí)數(shù)n, 每一級(jí)的P(c), TO, Te都是可調(diào)的,但需要注意最后一級(jí)的TO是不可調(diào)的必須等于真正的transaction timeout時(shí)間,Te的取值范圍是[Last(N)-TO, Last(N)]。
調(diào)優(yōu)的依據(jù)是我們會(huì)記錄每次補(bǔ)償操作的歷史記錄,比如每次補(bǔ)償成功的個(gè)數(shù),補(bǔ)償運(yùn)行的開始,結(jié)束時(shí)間等。
刪除檢測(cè)
增量索引服務(wù)只是負(fù)責(zé)update和add的檢測(cè),它并不判定索引中document對(duì)應(yīng)的記錄在DB中是否已經(jīng)被刪除,索引中會(huì)積累很多在DB中已經(jīng)被清除的數(shù)據(jù)。這些document也要及時(shí)地從索引中刪除。所以會(huì)有一個(gè)定期的刪除檢測(cè)服務(wù),檢測(cè)出那些在索引中有,而在DB中已經(jīng)被物理刪除的記錄。
刪除檢測(cè)服務(wù)的步驟:
l 從索引中分批取出所有OID,根據(jù)OID排序
l 用每個(gè)分批的最小值和最大值到DB中取出此OID段DB中存在(沒有被刪除)的所有OID,也根據(jù)OID排序
Select OID from X where oid>12001 and oid<24100 order by oid;
l 將索引中查處的OID 有序集合和DB中獲得的OID有序集合進(jìn)行對(duì)比,如果DB中沒有索引中有的就添加到deletedList中.
l 把deletedList中的所有記錄對(duì)應(yīng)document從索引中刪除
對(duì)于軟刪除,它們的狀態(tài)屬性active如果已經(jīng)被爬到索引中,直接從索引中選擇出那些active=0的document刪除,如果沒有,可以將刪除檢測(cè)的sql語句改成
Select OID from X where active = 0 and oid>12001 and oid<24100 order by oid;
其他步驟同上面硬刪除的部分
備份
備份時(shí)需要注意不僅要備份最后一次commit之前的所有索引,而且需要備份timeTrace文件. 恢復(fù)后只需要從timeTrace的時(shí)間開始爬就可以了.
注意點(diǎn)
主表updateTime沒有更新
有時(shí)候,業(yè)務(wù)邏輯更新了子對(duì)象,比如JobOrder對(duì)象包含了很多個(gè)Container對(duì)象,一個(gè)JobOrder對(duì)應(yīng)一個(gè)Lucene Document,當(dāng)Container對(duì)象更新時(shí),它并沒有更新JobOrder的updateTime,只是更新了Container的updateTime。這也沒關(guān)系,我們?cè)僭隽糠?wù)和補(bǔ)償策略中同時(shí)也會(huì)查出子表updateTime在當(dāng)前時(shí)間段的所有主表數(shù)據(jù)。
但container如果有刪除,就必須約定application必須要update主表的updateTime。否則用戶就會(huì)搜出他本不能訪問的被刪除的container。