Author:放翁(文初)
Email:fangweng@taobao.com
Blog:http://blog.csdn.net/cenwenchu79
閑話:(如果圖片看不清楚可以看另一個blog,因為圖片在家,這里上傳就只能轉貼了)
為什么又叫做什么…的點滴,首先現在寫程序就是練手,不論自己經歷了多少,如果想成為一個好的P,那么就要持續的去學習,去寫,當寫出來的東西總是一個樣子,那就要去學習一下,當覺得整天飄飄然的和同行在胡侃,那么就要靜下心來寫點東西。因此我的分享總是這個點滴那個點滴的,其實大家寫程序都大同小異,最寶貴的不是一個系統如何成功,而是在設計和實現這個系統的過程中,有哪些閃光點,這些閃光點日積月累就會讓你寫出來的東西給人一種“踏實”的感覺,同時不斷的多想一步,會讓你總是比別人做得更加精彩。
起因:
現在手頭工作太忙,所以分享的東西很多,但是沒有時間寫,因此這個MapReduce “單機版”說說是練手,但其實和當前TOP的業務也很有關系。過去在開放平臺中有重要的一部分內容就是日志分析和報表的功能,由于開放平臺的API請求里量很大,因此過去首先采用異步日志結合MySql分表模式來做日志分析,后來演化成為了異步日志結合Hadoop的分析模式。
TOP當前對于日志系統的需求是:
1. 滿足不穩定的需求統計分析(性能監控調優,業務趨勢分析,ISV行為統計等),快速出結果。(框架靈活度要高)
2. 分析系統配置使用簡單。(部署簡單,維護簡單,使用簡單)
3. 硬件資源節省。(資源投入少,長期有規劃上有規模化的集群分析計算)
4. 短期內上線。(開發成本低)
5.處理速度可接受。
根據以上幾點當前的需求,起碼現階段的TOP需要的分析器不需要用Hadoop,同時采用Hadoop在業務上不能滿足靈活的需求(發布要走流程),使用也過于復雜,硬件資源投入起碼兩到三臺PC,開發流程上由于業務的需求變動比較大,這樣如果采用比較硬的編碼方式,則很難短期上線。對于當前TOP的日志量用Hadoop的速度優勢暫時還不明顯。
就上面這些因素,考慮化幾天時間做一個單機版的MapReduce的日志分析器,滿足現有需求。
設計:

圖1 Simple MapReduce Log Analysis UseCase
圖2 基本流程定義

圖3 角色工作圖
系統簡化來看就分成三個角色:
1. JobManager:負責讀取系統配置,和初始化分析規則引擎,切割文件,創建Worker,協同Worker并行分析,合并分析結果,輸出報表。
2. RuleEngine:根據配置載入和構建日志解析規則(可定制化Map和Reduce實現),中間結果合并規則,報表創建規則,附帶發送郵件等功能配置。
3. JobWorker:根據規則引擎配置,逐行分析日志,每行分析出所有配置需要的結果,作一次簡單的MapReduce操作,輸出中間結果給Manager。
實現:
一.報表配置及規則引擎
a. 兩個層次。在Hadoop的MapReduce的計算結果是Key&Value,這通常并不是我們很多分析系統希望要的最終結果,分析系統希望是得到類似于SQL查詢到的一組結果,反過來看,對于一組結果其實就是一堆Key&Value的組合:
例如:
Select name,address,count(*) form t 得到的結果就是以name&address組合成為key,然后累加次數產生的value。
Select name,address,average(age) form t得到的結果就是以name&address組合成為key,然后平均年齡得到的value。
而這兩個結果由于都是以相同的兩個字段作為索引,因此歸類在一起就會形成我們通常希望看到的一個報表。因此產生了定義的報表配置的兩個層次:
1. ReportEntry就是一個key&value產生的規則定義。
2. Report就是單個報表創建的定義。
b. 五種基本統計函數,對于統計來說大多都是對數字的處理,抽象起來公用的主要有五種:min,max,sum,count,average。同時為了能夠提供顯示主鍵的功能,提供一個直接顯示內容的plain函數,這樣基本涵蓋了70%的統計需求。
c. 兩種表達式創建Value。對于value的創建可以設置表達式,比如說每一條記錄的第三列減去第四列的最大值可以配置為value=”$3$ - $4$”,用$符號分割表示對列的引用。也可以定義某一統計結果是其他列統計結果的計算結果,例如成功率可以使成功數量/總量,配置為value=”entry(成功數列號)/entry(總量列號)”,此類結果將在報表創建時候才被計算生成,屬于lazy分析。
下圖是具體的類定義圖

具體配置參見附錄說明。
二.切割日志文件
單臺服務器單日最大的日志有1G多的日志,對于這么大的日志需要考慮切分一下交由多個Jobworker來并行處理提高效率。因此就涉及到了切割文件的工作。切割文件就需要做到高效,數據完整性。
高效:一般切割是對一個目錄下所有文件切割,因此起一個線程池并行切割,提高效率。同時對于單個文件的切割,采用FileChannel的方式(MapFile),簡單按照配置大小切割成子文件。
數據完整性:由于TOP的日志文件是以回車換行作為記錄分割符,因此從第二個文件開始,每一個文件讀取第一句有回車換行的內容到上一個文件,這樣就可以保證數據的完整性(簡單補償方案),需要注意的就是邊界情況,當最后一個文件就一句話內容,那么這句內容一旦被提前,需要刪除這個子文件。
遇到的問題:
1. 直接根據設置的文件塊來拷貝,導致多線程并行處理時,native方法消耗內存溢出(這個其實在很多第三方的開源包中處理不好都會有這樣的問題),由于Jdk提供了內存映像文件,提高速度的同時,也為這類內存申請使用帶來了內存溢出的隱患(這類內存的回收和普通的GC回收不同,回收的時機也不一樣),因此當機器速度越快的時候,可能溢出的情況越容易發生。于是將inChannel.transferTo(beg,blocksize, outChannel)改成了一段一段的復制。
2. 單機磁盤IO瓶頸在某種程度上決定了多線程并行未必會提高多少處理效率。
3. 有同學和我說你其實不用切割,直接用RandomAccessFile來讀取分析就可以了,節省時間。感覺很有道理,前期陷入了思維定勢,但是對于單機來說磁盤IO及文件鎖使得虛擬切割的效率還不如單線程處理。(作了一下測試)
三.JobWorker實現
對于通過數據庫來做統計的情況,通常會需要Select幾次才會得到一個報表的幾項結果,但對于逐行掃描處理的情況來說,不論配置多少Entry,在一次日志讀取以后就能根據規則來計算出這個Entry的結果,因此對于海量數據的分析,在一次數據遍歷以后就可以得到所有的結果。這點也是去年我和開放平臺同學review他的hadoop MapReduce的時候提出的建議,如果就做一次MapReduce就需要分析一次數據,那么肯定會效率很低,通常就是需要定義一個Map就能夠作很多規則的分析,這就需要對于在傳統MapReduce中作較好的層次級別規劃,一次數據分析能夠被多個分析共享。而在這里設計JobWorker來說,本身逐行解析就可以實現這點,這也使得報表不論定義多少,分析的時間復雜度幾乎沒有增加。

IReportManager作用就是管理整個分析流程,初始化資源(載入配置,初始化規則引擎,切割文件),創建協調工作者,合并結果集,出報表。

定義了Worker兩個實現,一個是真實文件處理的Worker,一個是虛擬文件處理的Worker。(所謂的虛擬文件,就是上面提到的虛擬切割后生成的虛擬文件)在JobWorker處理中,可以根據規則引擎中定義的單個Entry處理模式和定制化Map或者Reduce實現來替換框架已有的Map和Reduce滿足不同的業務需求。具體的Map和Reduce參看下面的類圖。
IReportMap就一個接口generateKeyReportEntry(ReportEntry entry,Sting[] contents),也就是每一個Map都可以得到當前處理的數據內容以及當前Entry的數據定義。IReportMap和下面的IReportReduce都是可以通過運行期配置的方式替換現有框架中的業務邏輯。

Worker具體流程圖如下:

在流程中允許用自定義的Map和Reduce實現類替換默認的處理類,滿足用戶個性化需求,同時降低對于基礎框架的依賴。
問題:
1.多線程池(Executor等)必須控制好線程數量,防止內存溢出。
2.瓶頸在IO,因此效率提高有限。
3.自定義協議解析替換了J2se 6的js engine。JS引擎很強大,但是效率不高,在大規模數據處理的時候耗時嚴重,成為最大的瓶頸,因此采用簡單的算法來替換。
四.報表生成
最終報表的輸出類型,選擇了csv,首先由于csv結果排版簡單,其次,可以借助excel的強大圖形功能將數字結果轉換成為更加直觀的圖形結果。
比較和改進
傳統的MapReduce步驟如下:導入數據到分布式文件系統(切割文件,文件傳輸到dataNode,同時做好容災準備),JobNode在JobTracker的協調下開始分析,并在本地作一次reduce(減少數據傳輸),再匯總作Reduce,最后生成結果。
單機版MapReduce,只是將多機協作變成了多線程協作。
1. 省略數據傳輸不用讓數據靠近計算。
2. 通過配置文件的方式定制報表,可以靈活的將報表系統變成隨時可以根據需求變動的動態分析系統。(每次配置文件可以從遠端讀取,這樣就可以不發布而立刻獲得不同的報表)。
3. 使用便利,通過一個系統配置文件設定系統運行參數,然后直接執行jar,即可運行,不需要配置多機環境。
4. 對于幾十個G的數據處理正合適(效率)
5. 開發調試周期短,基本上5個人日開發+測試就搞定了。
但其實缺陷還是很一目了然的,就是我們為什么要用MapReduce的多機配置的初衷,單機最終在CPU,IO上都成為瓶頸,垂直擴容和水平擴容已經沒有什么好爭議的了,因此采用多機合作在規模化的處理上是必然趨勢。對于這個單機版作適當的調整,成為簡化型日志分析專用型多機版MapReduce還是蠻有必要的。那么切入點其實就是以下幾點:
1. 分析數據將會散布到多機,分別切割處理。(可以不考慮容災)
2. 多機多線程分析并在本機Reduce一次,然后通過配置找到內部的Master,交由Master來最終作Reduce。(workernode之間無需知道對方的存在)
3. Master由原來內存結果合并,轉變為傳輸過來的結果反序列化或者遵照私有消息格式來合并結果,最終創建出報表。
因此簡單來說就是數據分布及中間結果的傳遞和合并的工作處理一下,單機版就變成了多機版。最后就在附錄中詳細說明一下配置及運行后的效果。
附錄:
配置實例說明:
下面寫一個具體的實際配置來展示如何配置一個簡單的報表:(其中衍生出一些需求增強的內容)
配置文件是xml格式的,配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<top_reports>
<entrys>//定義需要給多個報表復用的Entry
<ReportEntry id="1" name="api_totalCount" key="6" value="count()"/>//id是這個entry唯一的標識(后面被引用到報表的依據),name將會作為報表的列名,key表示會以日志記錄的第幾項內容作為索引,可以通過逗號分割(組合索引),value表示創建的值是按照什么規則來創建,count函數不需要有內部表達式,average,min,max,plain都需要有表達式,表達式內部$16$代表日志記錄第幾位作為參數傳入運算,entry(16)代表對第16的entry結果作引用計算(具體參見下面配置)。這句話表示用第六位這個api名稱字段作為索引,計算各個api的總調用量。
<ReportEntry id="2" name="api_successCount" key="6" value="count()"
mapClass="com.taobao.top.analysis.map.APIErrorCodeMap" mapParams="key=6&errorCode=0"/>//這個配置和上面不同的就在于mapClass可以自定義實現,其實也就是實現了對于Key產生的規則實現,也就是MapReduce模型中的Map函數實現,這里可以用默認的(即簡單的列組合),也可以采用復雜的方式過濾和合并key,只需要實現Map接口即可,這后面詳細描述。這個配置表示產生key的過程中過濾掉不成功的請求,只統計成功請求
<ReportEntry id="3" name="api_failCount" key="6" value="count()"
mapClass="com.taobao.top.analysis.map.APIErrorCodeMap" mapParams="key=6&errorCode=1"/>//統計錯誤請求
<ReportEntry id="4" name="api_AverageServiceTimeConsume" key="6" value="average($14$ - $13$)" />//統計服務平均相應時間,由于在服務處理前后有時間打點,因此簡單的相減即可
<ReportEntry id="5" name="api_AverageTIPTimeConsume" key="6" value="average($16$ - $11$ - $14$ + $13$)" />
<ReportEntry id="6" name="api_MinServiceTimeConsume" key="6" value="min($14$ - $13$)" />//最小時間消耗
<ReportEntry id="7" name="api_MaxServiceTimeConsume" key="6" value="max($14$ - $13$)" />//最大時間消耗
……
</entrys>
<reports>
//具體的報表定義
<report id="2" file="apiReport" mailto="wenchu.cenwc@alibaba-inc.com">
<entryList>
<entry name="APIName" key="6" value="plain($6$)" />//不需要復用的entry可以直接定義在報表內部,這個定義表示直接顯示第六列即API的名稱
<entry id="1"/>
<entry id="2"/>
<entry name="APISuccessRatio" key="6" value="plain(entry(2)/entry(1))" /> //可以計算比例,通過對entry1和entry2的結果相除,不過這個就不是在逐行分析過程中實現,而是在結果合并時處理,屬于lazy后處理
<entry id="3"/>
<entry id="4"/>
<entry id="5"/>
<entry name="TIPTimeConsumeRatio" key="6" value="plain(entry(5)/entry(5)+entry(4))" />
<entry id="6"/>
<entry id="7"/>
</entryList>
</report>
</reports>
</top_reports>
