Author:放翁(文初)
Email:fangweng@taobao.com
Mblog:weibo.com/fangweng
Blog: http://blog.csdn.net/cenwenchu79/
局部設計
首先要說明的是,這部分內容和第一篇不同,必須對照代碼看才會理解其中的含義,光看設計實現會比較難懂其中所說的細節點。代碼:https://github.com/cenwenchu/beatles
IComponent:
做系統就像搭積木一樣,這些組件最后都會拼裝起來,而積木往往由于內部機制可定制化需要config,同時組合在一起的積木往往會有一個傳遞的Config(可以認為是靜態的Context)。
INode:
Node的設計比較簡單,自身是Runnable的單線程循環體,內置一個單線程事件監聽分
發器。Node的主線程主要負責該Node自身產生的事件處理(常規已知事件處理):Master就是維護任務列表狀態,并根據任務執行情況做一些Action,Slave就是重復的獲取,執行任務,返回結果。而Node中的事件監聽器主要用于外部消息驅動事件處理(偶發性事件處理),例如Master收到Slave的請求,外部要求導出載入中間結果等。這里會發現采用的是單線程阻塞檢查獲取事件:
1.多線程并發檢查事件,對于事件承載者(隊列)就要求做好并發控制,也就是最終在獲取事件的過程中依然是串行化,所以大部分事件處理框架對于事件檢查都采用單線程,簡單高效。
2.單線程可以用于檢查事件,但執行事件會采用多線程或者主線程直接處理,取決于事件執行速度和可靠性(外部依賴),如果事件可以快速執行(無外部依賴,邏輯簡單),則采用檢查線程直接處理(NIO中對于連接建立和銷毀直接在主線程中處理掉,這里Master對于獲取任務事件的前半段采用直接處理),如果事件消耗時間較久,或者依賴與外部系統穩定性和處理情況,則需要采用多線程異步處理(這里很多都是讓內部組件來保證方法執行的快速返回,例如JobExporter的所有方法等處理都是內部線程異步完成,對外接口快速返回),如果必須要有結果回執,那么可以采用回調模式或者直接提交新事件(帶有上下文可以接著上次處理)到事件處理引擎。
3.另一種方式會選擇將事件分開放置來提高處理效率,例如TimeOut事件和普通外部激發事件,注意盡量避免通過輪詢對象狀態來判斷事件發生,除了Timeout必須這么做,其他盡量通過對對象狀態變更操作內置事件產生器來創建真實事件,這樣事件處理者只需要阻塞等待事件即可,系統內狀態規模增大對于事件處理時間復雜度還是O(1)。對于Timeout這類事件后續介紹SlaveConnector中有更詳細介紹,至此略過。那是否需要將不同業務處理放置在不同的隊列交由不同的單線程處理,取決于系統事件產生速度,就好比NIO中可以起多個Selector來處理,由于起多個單線程守護到隊列如果利用率不高,其實對于系統來說也是一種負擔,所以可以做成可配置的方式提供給外部(beatles中是沒有提供配置,就一個線程,因為本身并不是大并發的web前端系統,接上千個slave的話消息量分布也不會非常密集,畢竟任務分析本身需要消耗時間)。
MasterNode中有兩個組件:JobManager和MasterConnector,一個負責上層業務處理,一個負責消息通信,在MasterNode運作的時候,兩者其實需要協調工作,例如MasterConnector可能會收到消息,需要提交給JobManager處理并獲得結果返回。為了實現內部組件不會相互依賴(MasterNode內部成為網狀結構),采用MasterNode作為中間消息傳遞者,通過事件或回調方式相互驅動,同時利用上下文(將Channel作為Event的一部結構,用于后續消息返回)來傳遞一些環境信息。需要注意的是,這種解耦的做法勢必帶來性能的下降,因此可以和前面提到的事件處理為多線程還是單線程一樣,對于消息機制的依賴也不要盲從,按需使用,例如Connector通過事件提交給MasterNode,MasterNode接收事件后調用JobManager處理,處理后的結果也可以利用事件機制反向驅動Master去調用Connector,但也可以直接將MasterNode植入JobManager,反向利用代理模式來直接處理,這里關鍵看你是否需要釋放掉你當前的線程,讓任務異步去做,而當前線程可以回收去做更多的處理,帶來的是線程切換和事件驅動的消耗。不過總體上來說讓組件的宿主來完成交互,能夠減少模塊間依賴帶來的耦合性和復雜度。
FileJobExporter
這個類主要用于文件輸出,但在輸出部分的代碼中有lazymerge的部分,所謂的lazy merge指的是部分entry<key,value>的結果是依賴于處理后的部分結果而得到,例如成功率這個指標就是用成功數/總數。作為分析系統來說,如果成功數的<key,value>需要長期保存,總數的<key,value>需要長期保存,那是否需要在最終產出報表以前就將成功率的<key,value>計算并保存在內存中呢?其實大可不必,不僅浪費了cpu資源,也浪費了大量的內存資源,同時slave傳遞給master還會使得網絡io消耗增大。在beatles中認為export就是最后的一步,因此在這個時候做計算和導出。在我們很多系統中,考慮一下很多中間結果是否需要輸出,還是保留在最后一步輸出(并不是保留在最后一步一定好,取決于代價,如果最后一步有大量的計算要做,那么可以用內存換機算,提早計算來減緩最后導出時的壓力,如果導出時計算不大,而系統整體處理內存資源緊張,那么就滯后處理)。衍生開來很多時候,需要考慮重復計算帶來的成本和節省內存帶來的收獲誰更有利,如果計算節點分散且規模巨大,則可以靠慮利用外部計算能力來減少集中式處理的代價(好比很多前端處理的結果可以滯后到客戶端處理而不是服務端集中處理,開放平臺的數據序列化推后到業務方集群處理而不是開放平臺統一處理)
JobManager
由于MasterNode中是單線程調用,因此對于任務狀態變更變得非常簡單(無需并發控制和原子操作),但由于MasterNode將來還是可擴展為多個線程處理,因此暫時保留原子操作的處理模式。
1. 對于對象狀態管理,如果對象層次比較多,盡量扁平化處理,就好比把TaskStatus直接保存,有利于檢查和原子操作,帶來的問題就是另一部分對象的狀態同步變更(Task中的狀態),其實簡單來說就是兩個數據結構修改要做到事務性,做法比較簡單,細粒度的原子操作模擬鎖爭奪,例如要修改Task的Status首先要并發的修改TaskStatus的數據(if (statusPool.replace(taskId, JobTaskStatus.DOING, JobTaskStatus.DONE)),如果修改成功,才可以修改原始對象內的數據。其實如果是單線程都不需要并發控制(因為并發的模式還是有些消耗的)。
2. 事件驅動模型中很重要一點就是事件狀態必須在所有必要操作后再改變(即創建事件),例如:早一個版本中,Master收到Slave返回結果時,將會把結果設置到Master的某一個Task的result屬性中,同時改變Task的狀態為done,這兩個動作就必須保持一定的順序,也就是先要把內容設置進去,然后再改變狀態,因為如果先改變狀態,外部事件處理線程如果發現狀態已經改變,又沒有鎖保證結果放進去以前不能處理這個事件,就會發現事件開始被處理了,但是內容還是錯過了處理,出現線程并發問題。這點在這個版本的源碼注釋上面有點問題,后續修改掉它。
3. 在主流程上有一個方法mergeAndExportJobs,用于檢查Job內部的Tasks完成狀態,決定是否合并或者導出結果,首先受限制于JobManager主流程是單線程處理,同時內部Tasks狀態隨時會變,因此要求主流程的所有操作和檢查都必須非阻塞,保證處理的即時性,但如果這個方法里面的所有操作都變成另起線程異步處理的話,就同樣會發生上面我談過的事件檢查多線程模式最終還是會并發控制下變成串行化,效率不升反降,因此采用同一業務性數據處理守護進程唯一性的方式(其實簡單來說就是在這里Master中管理多個Job,多個Job其實就好比多個事件隊列,因此必須并行處理,否則會有互相影響的風險,但是單個Job的處理可以只有一個守護線程處理,因此對Job加事件鎖,保證不同Job之間同一個事件并行,同一個job不同事件并行(這里由于都是順序化的,雖然并行了,但還是要等待上一個事件完成后才會修改內部狀態繼續往下走))
4. 在第一篇里面說到,這個框架對于任務執行異常的處理十分簡單,事先規定好單個任務執行的最長可接受時間,如果到了時間尚未獲得反饋,就認為出現問題,任務重置可以接受下一個計算節點的處理請求。(結果誰先返回就用誰的)這里其實要注意兩點:任務時間可評估是基于任務切分粒度夠細,其實很多時候可以考慮通過任務細化來降低任務出現問題解決的復雜度,同時也可以降低計算節點重新做任務的代價。另一方面需要設置重置次數透明化,保證如果任務本身有問題(例如數據來源出現問題),不會使得所有的計算節點陷入單個任務處理死循環。
5. 合并數據的代碼優化:
A. Master合并時每一個Job只有一個主干,也就是最后job的所有Task Result都必須合并到這個主干,假設這是個svn主干,可以想象多個人(多線程)是無法并行合并的。那么當主線程在A時刻發現有4個結果需要合并的時候,它開始把4個結果合并到主干,合并的過程中可能又來了3個結果,那么這三個結果就必須等待下一輪的合并開始,此時這三個結果耗費的內存就會增加系統的負擔,同時系統如果Slave越多,這樣的情況越嚴重。因此引入下面一種模式,多線程合并,但主干和虛擬分支同時進行,當需要合并時首先競爭主干鎖,得到主干鎖的線程將這次需要合并的結果和以前合并的虛擬分支一起合并到主干,而如果沒有得到主干鎖的線程并行的合并結果到虛擬分支上。此時充分利用多核的計算能力來壓縮對于內存的需求(結果合并后會大大減少存儲的需求)。
B. 由于A中的描述可以看到,主干在整個Job的任務執行合并過程中都被保存在內存中,因此當結果集越大,主干對系統內存消耗就越大,而Job的多輪合并是否可以最后載入上一輪的主干和本輪增量結果合并,這樣可以大大減少內存消耗,但是內容的導出和載入帶來的序列化代價和IO的消耗勢必會增加每一輪的處理時間,和減少GC帶來的節省時間的優勢可能會沖抵甚至有負面效果。因此通過異步載入和導出,即節省了內存占用,減少FullGC帶來的停頓,又不影響處理,另一方面其實也是利用兩個階段的CPU閑置率較高來交換內存的代價。(這部分代碼參看jobexporter和jobmanager)
SlaveNode:
充分利用Slave單機CPU的方式可以是:一臺機器可以跑多個Slave。也可以跑一個Slave,單個Slave一次要求獲取多個Task,這樣可以并行利用多個cpu處理多個任務。
為了減少Master的合并壓力,可以讓Slave直接輸出,也可以通過Slave要求多個Task,執行完多個Task在本地合并(Task必須是同一個Job才可以合并),再將合并后的結果會送給Master。
對于同一個數據源可以通過創建同樣的多個Task來增加對其的處理速度,例如A機器的日志增長比B機器的快,那么可以配置,兩個數據來源是A機器的Task,配置一個B機器的Task,來差別對待處理速度。
對于處理后的數據如果還需要二次處理,可以構建Job的數據來源是一次處理后的數據輸出地,當一次數據輸出以后,自然二次處理才會開始。
簡單來說,很多復雜的sharding設計,reduce的考慮,任務迭代處理,其實都可以通過扁平化的方式來解決,有時候花很很大力氣去做的看似很fancy的設計,不如歸一化處理。(再大的數字都是從一衍生出來的)
Connector:
這部分設計主要是屏蔽掉分布式概念的誤區,很多分布式設計開始的時候不是注重對于主節點和次節點的業務交互上,而是糾結于底層設計上,最后就會落得調試難,擴展難的情形。和上面的歸一化設計思想一樣,所謂的分布式其實可以是一個進程內(虛擬機內)的交互協作,一臺機器多進程的交互協作,多臺機器多進程的交互協作,因此如何能夠適合這三個場景,就會讓設計變得簡單,容易擴展,實現與接口分離。
Event:
Event中需要考慮一些上下文設計,例如序列號保證松散交互的會話可維護性,Channel等后續操作的基礎傳遞。Event盡量做到無業務侵入,例如雖然需要Channel,但不同的實現Channel是不同的,MemChannel和SocketChannel就不同,將來擴展更是不同,做好一些就抽象一些接口(但可能需要對一些實現做外殼封裝適應接口),或者就直接Object弱化類型。
InputAdaptor&OutputAdaptor:
任務的自描述性除了業務規則的自描述性,更需要輸入輸出的自描述性,所有計算歸結到底無非是輸入,處理,輸出,如果三者定義清楚,并且可以通過支持協議擴展適配,那么對于計算節點來說就非常通用了,不必因為業務的差別,數據來源和輸出的差別來分別建立多個集群,最終還是發現多個集群無法很好的充分利用資源的高低峰(對于明確要保護的計算集群可以直接構建,對于一些非關鍵性的計算任務可以丟到一個集群中搞定),降低成本。
Job:
本身是一組任務的集合,自身有多個狀態位,當前通過多個狀態來表示(可以合并為一個原子狀態位),內置一些鎖來控制主干的并發訪問,守護進程的分配。(這點在另一個PipeComet項目中對于長連接管道下行守護進程按需分配也有充分利用到)。
Operation:
這個包里面是將耗時的操作封裝為可以被外部線程獨立執行的Runnable,可以看見在整體代碼里面有用外部線程異步執行的,也有直接在線程里面阻塞執行的,取決于對于結果返回的同步性需求,如果同步性需求明確,那么可以用異步+鎖的方式來模擬同步,也可以直接同步,但前者代價較大,所以將這類操作抽象,上下文通過參數傳遞來構建出可以異步也可以同步執行的邏輯塊,提高了功能執行的靈活性。
CreateReportOperation中的輸出模式還是比較節省空間的,可以看一下如何基于<key,value>列矩陣輸出報表這樣的行式記錄保持對內存較小的占用。
ReportUtil:
是個工具大雜燴。
1. mergeEntryResult。將多個矩陣結果合并的函數,里面有不少的節省內存的做法,首先選取第一個矩陣作為base,節省申請和合并的過程,合并過程中不斷刪除合并后的數據,節省后續合并成本,釋放資源。
2. compressString。嘗試采用不可逆壓縮來減少處理中中間key占用的內存,例如每一個entry的key是幾個列的組合,而key僅僅表示唯一性,如果能夠做到壓縮且不失唯一性,那么最終不會影響需要輸出的結果。這里采用短鏈接的處理方式。(md5+16以上的進制模式)
TimeOutQueue:
最前面提到過,基本所有的外部對象狀態變更都可以被捕獲,然后產生一個事件,而超時事件必須是主動檢查才可以判斷,因此當對象數據量增加的時候,超時檢查的消耗就會變成O(N),一般會推薦使用分區模式(時間輪盤,時間槽)來縮減N增加帶來的影響,另一種方式比較適合于超時時間不會變動的情況,就好比將一個對象放入后,它的超時時間從創建初期到銷毀都不會再改變,如果是這種情況,那么可以采用這個類的實現方式。
內置一個有順序的單向鏈或者隊列,按照超時時間的前后建立先后順序,最早超時的對象放在最前面,內部線程每次從隊列或者鏈的第一位開始檢查,如果發現超時,則處理,繼續往后走,當發現沒有超時的時候,獲得該對象距離超時時間的間隔,然后掛起這間隔的時間。期間有任何數據加入,如果超時時間小于隊列第一個對象超時時間,則加入隊列,然后喚醒檢查線程(切記順序不要反,先加入隊列,再喚醒)。最后在增加一個防止隊列為空的消費者生產者的標識,保證不要空循環。