首先要說明的是,這部分內(nèi)容和第一篇不同,必須對(duì)照代碼看才會(huì)理解其中的含義,光看設(shè)計(jì)實(shí)現(xiàn)會(huì)比較難懂其中所說的細(xì)節(jié)點(diǎn)。代碼:https://github.com/cenwenchu/beatles
IComponent:

做系統(tǒng)就像搭積木一樣,這些組件最后都會(huì)拼裝起來,而積木往往由于內(nèi)部機(jī)制可定制化需要config,同時(shí)組合在一起的積木往往會(huì)有一個(gè)傳遞的Config(可以認(rèn)為是靜態(tài)的Context)。
INode:

Node的設(shè)計(jì)比較簡(jiǎn)單,自身是Runnable的單線程循環(huán)體,內(nèi)置一個(gè)單線程事件監(jiān)聽分
發(fā)器。Node的主線程主要負(fù)責(zé)該Node自身產(chǎn)生的事件處理(常規(guī)已知事件處理):Master就是維護(hù)任務(wù)列表狀態(tài),并根據(jù)任務(wù)執(zhí)行情況做一些Action,Slave就是重復(fù)的獲取,執(zhí)行任務(wù),返回結(jié)果。而Node中的事件監(jiān)聽器主要用于外部消息驅(qū)動(dòng)事件處理(偶發(fā)性事件處理),例如Master收到Slave的請(qǐng)求,外部要求導(dǎo)出載入中間結(jié)果等。這里會(huì)發(fā)現(xiàn)采用的是單線程阻塞檢查獲取事件:
1.多線程并發(fā)檢查事件,對(duì)于事件承載者(隊(duì)列)就要求做好并發(fā)控制,也就是最終在獲取事件的過程中依然是串行化,所以大部分事件處理框架對(duì)于事件檢查都采用單線程,簡(jiǎn)單高效。
2.單線程可以用于檢查事件,但執(zhí)行事件會(huì)采用多線程或者主線程直接處理,取決于事件執(zhí)行速度和可靠性(外部依賴),如果事件可以快速執(zhí)行(無外部依賴,邏輯簡(jiǎn)單),則采用檢查線程直接處理(NIO中對(duì)于連接建立和銷毀直接在主線程中處理掉,這里Master對(duì)于獲取任務(wù)事件的前半段采用直接處理),如果事件消耗時(shí)間較久,或者依賴與外部系統(tǒng)穩(wěn)定性和處理情況,則需要采用多線程異步處理(這里很多都是讓內(nèi)部組件來保證方法執(zhí)行的快速返回,例如JobExporter的所有方法等處理都是內(nèi)部線程異步完成,對(duì)外接口快速返回),如果必須要有結(jié)果回執(zhí),那么可以采用回調(diào)模式或者直接提交新事件(帶有上下文可以接著上次處理)到事件處理引擎。
3.另一種方式會(huì)選擇將事件分開放置來提高處理效率,例如TimeOut事件和普通外部激發(fā)事件,注意盡量避免通過輪詢對(duì)象狀態(tài)來判斷事件發(fā)生,除了Timeout必須這么做,其他盡量通過對(duì)對(duì)象狀態(tài)變更操作內(nèi)置事件產(chǎn)生器來創(chuàng)建真實(shí)事件,這樣事件處理者只需要阻塞等待事件即可,系統(tǒng)內(nèi)狀態(tài)規(guī)模增大對(duì)于事件處理時(shí)間復(fù)雜度還是O(1)。對(duì)于Timeout這類事件后續(xù)介紹SlaveConnector中有更詳細(xì)介紹,至此略過。那是否需要將不同業(yè)務(wù)處理放置在不同的隊(duì)列交由不同的單線程處理,取決于系統(tǒng)事件產(chǎn)生速度,就好比NIO中可以起多個(gè)Selector來處理,由于起多個(gè)單線程守護(hù)到隊(duì)列如果利用率不高,其實(shí)對(duì)于系統(tǒng)來說也是一種負(fù)擔(dān),所以可以做成可配置的方式提供給外部(beatles中是沒有提供配置,就一個(gè)線程,因?yàn)楸旧聿⒉皇谴蟛l(fā)的web前端系統(tǒng),接上千個(gè)slave的話消息量分布也不會(huì)非常密集,畢竟任務(wù)分析本身需要消耗時(shí)間)。
MasterNode中有兩個(gè)組件:JobManager和MasterConnector,一個(gè)負(fù)責(zé)上層業(yè)務(wù)處理,一個(gè)負(fù)責(zé)消息通信,在MasterNode運(yùn)作的時(shí)候,兩者其實(shí)需要協(xié)調(diào)工作,例如MasterConnector可能會(huì)收到消息,需要提交給JobManager處理并獲得結(jié)果返回。為了實(shí)現(xiàn)內(nèi)部組件不會(huì)相互依賴(MasterNode內(nèi)部成為網(wǎng)狀結(jié)構(gòu)),采用MasterNode作為中間消息傳遞者,通過事件或回調(diào)方式相互驅(qū)動(dòng),同時(shí)利用上下文(將Channel作為Event的一部結(jié)構(gòu),用于后續(xù)消息返回)來傳遞一些環(huán)境信息。需要注意的是,這種解耦的做法勢(shì)必帶來性能的下降,因此可以和前面提到的事件處理為多線程還是單線程一樣,對(duì)于消息機(jī)制的依賴也不要盲從,按需使用,例如Connector通過事件提交給MasterNode,MasterNode接收事件后調(diào)用JobManager處理,處理后的結(jié)果也可以利用事件機(jī)制反向驅(qū)動(dòng)Master去調(diào)用Connector,但也可以直接將MasterNode植入JobManager,反向利用代理模式來直接處理,這里關(guān)鍵看你是否需要釋放掉你當(dāng)前的線程,讓任務(wù)異步去做,而當(dāng)前線程可以回收去做更多的處理,帶來的是線程切換和事件驅(qū)動(dòng)的消耗。不過總體上來說讓組件的宿主來完成交互,能夠減少模塊間依賴帶來的耦合性和復(fù)雜度。
FileJobExporter
這個(gè)類主要用于文件輸出,但在輸出部分的代碼中有lazymerge的部分,所謂的lazy merge指的是部分entry<key,value>的結(jié)果是依賴于處理后的部分結(jié)果而得到,例如成功率這個(gè)指標(biāo)就是用成功數(shù)/總數(shù)。作為分析系統(tǒng)來說,如果成功數(shù)的<key,value>需要長期保存,總數(shù)的<key,value>需要長期保存,那是否需要在最終產(chǎn)出報(bào)表以前就將成功率的<key,value>計(jì)算并保存在內(nèi)存中呢?其實(shí)大可不必,不僅浪費(fèi)了cpu資源,也浪費(fèi)了大量的內(nèi)存資源,同時(shí)slave傳遞給master還會(huì)使得網(wǎng)絡(luò)io消耗增大。在beatles中認(rèn)為export就是最后的一步,因此在這個(gè)時(shí)候做計(jì)算和導(dǎo)出。在我們很多系統(tǒng)中,考慮一下很多中間結(jié)果是否需要輸出,還是保留在最后一步輸出(并不是保留在最后一步一定好,取決于代價(jià),如果最后一步有大量的計(jì)算要做,那么可以用內(nèi)存換機(jī)算,提早計(jì)算來減緩最后導(dǎo)出時(shí)的壓力,如果導(dǎo)出時(shí)計(jì)算不大,而系統(tǒng)整體處理內(nèi)存資源緊張,那么就滯后處理)。衍生開來很多時(shí)候,需要考慮重復(fù)計(jì)算帶來的成本和節(jié)省內(nèi)存帶來的收獲誰更有利,如果計(jì)算節(jié)點(diǎn)分散且規(guī)模巨大,則可以靠慮利用外部計(jì)算能力來減少集中式處理的代價(jià)(好比很多前端處理的結(jié)果可以滯后到客戶端處理而不是服務(wù)端集中處理,開放平臺(tái)的數(shù)據(jù)序列化推后到業(yè)務(wù)方集群處理而不是開放平臺(tái)統(tǒng)一處理)
JobManager
由于MasterNode中是單線程調(diào)用,因此對(duì)于任務(wù)狀態(tài)變更變得非常簡(jiǎn)單(無需并發(fā)控制和原子操作),但由于MasterNode將來還是可擴(kuò)展為多個(gè)線程處理,因此暫時(shí)保留原子操作的處理模式。
1. 對(duì)于對(duì)象狀態(tài)管理,如果對(duì)象層次比較多,盡量扁平化處理,就好比把TaskStatus直接保存,有利于檢查和原子操作,帶來的問題就是另一部分對(duì)象的狀態(tài)同步變更(Task中的狀態(tài)),其實(shí)簡(jiǎn)單來說就是兩個(gè)數(shù)據(jù)結(jié)構(gòu)修改要做到事務(wù)性,做法比較簡(jiǎn)單,細(xì)粒度的原子操作模擬鎖爭(zhēng)奪,例如要修改Task的Status首先要并發(fā)的修改TaskStatus的數(shù)據(jù)(if (statusPool.replace(taskId, JobTaskStatus.DOING, JobTaskStatus.DONE)),如果修改成功,才可以修改原始對(duì)象內(nèi)的數(shù)據(jù)。其實(shí)如果是單線程都不需要并發(fā)控制(因?yàn)椴l(fā)的模式還是有些消耗的)。
2. 事件驅(qū)動(dòng)模型中很重要一點(diǎn)就是事件狀態(tài)必須在所有必要操作后再改變(即創(chuàng)建事件),例如:早一個(gè)版本中,Master收到Slave返回結(jié)果時(shí),將會(huì)把結(jié)果設(shè)置到Master的某一個(gè)Task的result屬性中,同時(shí)改變Task的狀態(tài)為done,這兩個(gè)動(dòng)作就必須保持一定的順序,也就是先要把內(nèi)容設(shè)置進(jìn)去,然后再改變狀態(tài),因?yàn)槿绻雀淖儬顟B(tài),外部事件處理線程如果發(fā)現(xiàn)狀態(tài)已經(jīng)改變,又沒有鎖保證結(jié)果放進(jìn)去以前不能處理這個(gè)事件,就會(huì)發(fā)現(xiàn)事件開始被處理了,但是內(nèi)容還是錯(cuò)過了處理,出現(xiàn)線程并發(fā)問題。這點(diǎn)在這個(gè)版本的源碼注釋上面有點(diǎn)問題,后續(xù)修改掉它。
3. 在主流程上有一個(gè)方法mergeAndExportJobs,用于檢查Job內(nèi)部的Tasks完成狀態(tài),決定是否合并或者導(dǎo)出結(jié)果,首先受限制于JobManager主流程是單線程處理,同時(shí)內(nèi)部Tasks狀態(tài)隨時(shí)會(huì)變,因此要求主流程的所有操作和檢查都必須非阻塞,保證處理的即時(shí)性,但如果這個(gè)方法里面的所有操作都變成另起線程異步處理的話,就同樣會(huì)發(fā)生上面我談過的事件檢查多線程模式最終還是會(huì)并發(fā)控制下變成串行化,效率不升反降,因此采用同一業(yè)務(wù)性數(shù)據(jù)處理守護(hù)進(jìn)程唯一性的方式(其實(shí)簡(jiǎn)單來說就是在這里Master中管理多個(gè)Job,多個(gè)Job其實(shí)就好比多個(gè)事件隊(duì)列,因此必須并行處理,否則會(huì)有互相影響的風(fēng)險(xiǎn),但是單個(gè)Job的處理可以只有一個(gè)守護(hù)線程處理,因此對(duì)Job加事件鎖,保證不同Job之間同一個(gè)事件并行,同一個(gè)job不同事件并行(這里由于都是順序化的,雖然并行了,但還是要等待上一個(gè)事件完成后才會(huì)修改內(nèi)部狀態(tài)繼續(xù)往下走))
4. 在第一篇里面說到,這個(gè)框架對(duì)于任務(wù)執(zhí)行異常的處理十分簡(jiǎn)單,事先規(guī)定好單個(gè)任務(wù)執(zhí)行的最長可接受時(shí)間,如果到了時(shí)間尚未獲得反饋,就認(rèn)為出現(xiàn)問題,任務(wù)重置可以接受下一個(gè)計(jì)算節(jié)點(diǎn)的處理請(qǐng)求。(結(jié)果誰先返回就用誰的)這里其實(shí)要注意兩點(diǎn):任務(wù)時(shí)間可評(píng)估是基于任務(wù)切分粒度夠細(xì),其實(shí)很多時(shí)候可以考慮通過任務(wù)細(xì)化來降低任務(wù)出現(xiàn)問題解決的復(fù)雜度,同時(shí)也可以降低計(jì)算節(jié)點(diǎn)重新做任務(wù)的代價(jià)。另一方面需要設(shè)置重置次數(shù)透明化,保證如果任務(wù)本身有問題(例如數(shù)據(jù)來源出現(xiàn)問題),不會(huì)使得所有的計(jì)算節(jié)點(diǎn)陷入單個(gè)任務(wù)處理死循環(huán)。
5. 合并數(shù)據(jù)的代碼優(yōu)化:
A. Master合并時(shí)每一個(gè)Job只有一個(gè)主干,也就是最后job的所有Task Result都必須合并到這個(gè)主干,假設(shè)這是個(gè)svn主干,可以想象多個(gè)人(多線程)是無法并行合并的。那么當(dāng)主線程在A時(shí)刻發(fā)現(xiàn)有4個(gè)結(jié)果需要合并的時(shí)候,它開始把4個(gè)結(jié)果合并到主干,合并的過程中可能又來了3個(gè)結(jié)果,那么這三個(gè)結(jié)果就必須等待下一輪的合并開始,此時(shí)這三個(gè)結(jié)果耗費(fèi)的內(nèi)存就會(huì)增加系統(tǒng)的負(fù)擔(dān),同時(shí)系統(tǒng)如果Slave越多,這樣的情況越嚴(yán)重。因此引入下面一種模式,多線程合并,但主干和虛擬分支同時(shí)進(jìn)行,當(dāng)需要合并時(shí)首先競(jìng)爭(zhēng)主干鎖,得到主干鎖的線程將這次需要合并的結(jié)果和以前合并的虛擬分支一起合并到主干,而如果沒有得到主干鎖的線程并行的合并結(jié)果到虛擬分支上。此時(shí)充分利用多核的計(jì)算能力來壓縮對(duì)于內(nèi)存的需求(結(jié)果合并后會(huì)大大減少存儲(chǔ)的需求)。

B. 由于A中的描述可以看到,主干在整個(gè)Job的任務(wù)執(zhí)行合并過程中都被保存在內(nèi)存中,因此當(dāng)結(jié)果集越大,主干對(duì)系統(tǒng)內(nèi)存消耗就越大,而Job的多輪合并是否可以最后載入上一輪的主干和本輪增量結(jié)果合并,這樣可以大大減少內(nèi)存消耗,但是內(nèi)容的導(dǎo)出和載入帶來的序列化代價(jià)和IO的消耗勢(shì)必會(huì)增加每一輪的處理時(shí)間,和減少GC帶來的節(jié)省時(shí)間的優(yōu)勢(shì)可能會(huì)沖抵甚至有負(fù)面效果。因此通過異步載入和導(dǎo)出,即節(jié)省了內(nèi)存占用,減少FullGC帶來的停頓,又不影響處理,另一方面其實(shí)也是利用兩個(gè)階段的CPU閑置率較高來交換內(nèi)存的代價(jià)。(這部分代碼參看jobexporter和jobmanager)

SlaveNode:
充分利用Slave單機(jī)CPU的方式可以是:一臺(tái)機(jī)器可以跑多個(gè)Slave。也可以跑一個(gè)Slave,單個(gè)Slave一次要求獲取多個(gè)Task,這樣可以并行利用多個(gè)cpu處理多個(gè)任務(wù)。
為了減少Master的合并壓力,可以讓Slave直接輸出,也可以通過Slave要求多個(gè)Task,執(zhí)行完多個(gè)Task在本地合并(Task必須是同一個(gè)Job才可以合并),再將合并后的結(jié)果會(huì)送給Master。
對(duì)于同一個(gè)數(shù)據(jù)源可以通過創(chuàng)建同樣的多個(gè)Task來增加對(duì)其的處理速度,例如A機(jī)器的日志增長比B機(jī)器的快,那么可以配置,兩個(gè)數(shù)據(jù)來源是A機(jī)器的Task,配置一個(gè)B機(jī)器的Task,來差別對(duì)待處理速度。
對(duì)于處理后的數(shù)據(jù)如果還需要二次處理,可以構(gòu)建Job的數(shù)據(jù)來源是一次處理后的數(shù)據(jù)輸出地,當(dāng)一次數(shù)據(jù)輸出以后,自然二次處理才會(huì)開始。
簡(jiǎn)單來說,很多復(fù)雜的sharding設(shè)計(jì),reduce的考慮,任務(wù)迭代處理,其實(shí)都可以通過扁平化的方式來解決,有時(shí)候花很很大力氣去做的看似很fancy的設(shè)計(jì),不如歸一化處理。(再大的數(shù)字都是從一衍生出來的)
Connector:
這部分設(shè)計(jì)主要是屏蔽掉分布式概念的誤區(qū),很多分布式設(shè)計(jì)開始的時(shí)候不是注重對(duì)于主節(jié)點(diǎn)和次節(jié)點(diǎn)的業(yè)務(wù)交互上,而是糾結(jié)于底層設(shè)計(jì)上,最后就會(huì)落得調(diào)試難,擴(kuò)展難的情形。和上面的歸一化設(shè)計(jì)思想一樣,所謂的分布式其實(shí)可以是一個(gè)進(jìn)程內(nèi)(虛擬機(jī)內(nèi))的交互協(xié)作,一臺(tái)機(jī)器多進(jìn)程的交互協(xié)作,多臺(tái)機(jī)器多進(jìn)程的交互協(xié)作,因此如何能夠適合這三個(gè)場(chǎng)景,就會(huì)讓設(shè)計(jì)變得簡(jiǎn)單,容易擴(kuò)展,實(shí)現(xiàn)與接口分離。
Event:
Event中需要考慮一些上下文設(shè)計(jì),例如序列號(hào)保證松散交互的會(huì)話可維護(hù)性,Channel等后續(xù)操作的基礎(chǔ)傳遞。Event盡量做到無業(yè)務(wù)侵入,例如雖然需要Channel,但不同的實(shí)現(xiàn)Channel是不同的,MemChannel和SocketChannel就不同,將來擴(kuò)展更是不同,做好一些就抽象一些接口(但可能需要對(duì)一些實(shí)現(xiàn)做外殼封裝適應(yīng)接口),或者就直接Object弱化類型。
InputAdaptor&OutputAdaptor:
任務(wù)的自描述性除了業(yè)務(wù)規(guī)則的自描述性,更需要輸入輸出的自描述性,所有計(jì)算歸結(jié)到底無非是輸入,處理,輸出,如果三者定義清楚,并且可以通過支持協(xié)議擴(kuò)展適配,那么對(duì)于計(jì)算節(jié)點(diǎn)來說就非常通用了,不必因?yàn)闃I(yè)務(wù)的差別,數(shù)據(jù)來源和輸出的差別來分別建立多個(gè)集群,最終還是發(fā)現(xiàn)多個(gè)集群無法很好的充分利用資源的高低峰(對(duì)于明確要保護(hù)的計(jì)算集群可以直接構(gòu)建,對(duì)于一些非關(guān)鍵性的計(jì)算任務(wù)可以丟到一個(gè)集群中搞定),降低成本。
Job:
本身是一組任務(wù)的集合,自身有多個(gè)狀態(tài)位,當(dāng)前通過多個(gè)狀態(tài)來表示(可以合并為一個(gè)原子狀態(tài)位),內(nèi)置一些鎖來控制主干的并發(fā)訪問,守護(hù)進(jìn)程的分配。(這點(diǎn)在另一個(gè)PipeComet項(xiàng)目中對(duì)于長連接管道下行守護(hù)進(jìn)程按需分配也有充分利用到)。
Operation:
這個(gè)包里面是將耗時(shí)的操作封裝為可以被外部線程獨(dú)立執(zhí)行的Runnable,可以看見在整體代碼里面有用外部線程異步執(zhí)行的,也有直接在線程里面阻塞執(zhí)行的,取決于對(duì)于結(jié)果返回的同步性需求,如果同步性需求明確,那么可以用異步+鎖的方式來模擬同步,也可以直接同步,但前者代價(jià)較大,所以將這類操作抽象,上下文通過參數(shù)傳遞來構(gòu)建出可以異步也可以同步執(zhí)行的邏輯塊,提高了功能執(zhí)行的靈活性。
CreateReportOperation中的輸出模式還是比較節(jié)省空間的,可以看一下如何基于<key,value>列矩陣輸出報(bào)表這樣的行式記錄保持對(duì)內(nèi)存較小的占用。
ReportUtil:
是個(gè)工具大雜燴。
1. mergeEntryResult。將多個(gè)矩陣結(jié)果合并的函數(shù),里面有不少的節(jié)省內(nèi)存的做法,首先選取第一個(gè)矩陣作為base,節(jié)省申請(qǐng)和合并的過程,合并過程中不斷刪除合并后的數(shù)據(jù),節(jié)省后續(xù)合并成本,釋放資源。
2. compressString。嘗試采用不可逆壓縮來減少處理中中間key占用的內(nèi)存,例如每一個(gè)entry的key是幾個(gè)列的組合,而key僅僅表示唯一性,如果能夠做到壓縮且不失唯一性,那么最終不會(huì)影響需要輸出的結(jié)果。這里采用短鏈接的處理方式。(md5+16以上的進(jìn)制模式)
TimeOutQueue:
最前面提到過,基本所有的外部對(duì)象狀態(tài)變更都可以被捕獲,然后產(chǎn)生一個(gè)事件,而超時(shí)事件必須是主動(dòng)檢查才可以判斷,因此當(dāng)對(duì)象數(shù)據(jù)量增加的時(shí)候,超時(shí)檢查的消耗就會(huì)變成O(N),一般會(huì)推薦使用分區(qū)模式(時(shí)間輪盤,時(shí)間槽)來縮減N增加帶來的影響,另一種方式比較適合于超時(shí)時(shí)間不會(huì)變動(dòng)的情況,就好比將一個(gè)對(duì)象放入后,它的超時(shí)時(shí)間從創(chuàng)建初期到銷毀都不會(huì)再改變,如果是這種情況,那么可以采用這個(gè)類的實(shí)現(xiàn)方式。
內(nèi)置一個(gè)有順序的單向鏈或者隊(duì)列,按照超時(shí)時(shí)間的前后建立先后順序,最早超時(shí)的對(duì)象放在最前面,內(nèi)部線程每次從隊(duì)列或者鏈的第一位開始檢查,如果發(fā)現(xiàn)超時(shí),則處理,繼續(xù)往后走,當(dāng)發(fā)現(xiàn)沒有超時(shí)的時(shí)候,獲得該對(duì)象距離超時(shí)時(shí)間的間隔,然后掛起這間隔的時(shí)間。期間有任何數(shù)據(jù)加入,如果超時(shí)時(shí)間小于隊(duì)列第一個(gè)對(duì)象超時(shí)時(shí)間,則加入隊(duì)列,然后喚醒檢查線程(切記順序不要反,先加入隊(duì)列,再喚醒)。最后在增加一個(gè)防止隊(duì)列為空的消費(fèi)者生產(chǎn)者的標(biāo)識(shí),保證不要空循環(huán)。