https://github.com/jonenine/HST
雖然大數(shù)據(jù)的發(fā)展已經(jīng)將近10個年頭了,hadoop技術仍然沒有過時,特別是一些低成本,入門級的小項目,使用hadoop還是蠻不錯的。而且,也不是每一個公司都有能力招聘和培養(yǎng)自己的spark人才。
我本人對于hadoop mapreduce是有一些意見的。hadoop mapreduce技術對于開發(fā)人員的友好度不高,程序難寫,調(diào)試困難,對于復雜的業(yè)務邏輯遠沒有spark得心應手。
2016年的春節(jié)前接到一個任務,要在一個沒有spark的平臺實現(xiàn)電力系統(tǒng)的一些統(tǒng)計分析算法,可選的技術只有hadoop mapreduce。受了這個刺激之后產(chǎn)生了一些奇思妙想,然后做了一些試驗,并最終形成HST---hadoop simplize toolkit,還真是無心載柳柳成蔭啊。
HST基本優(yōu)點如下:
屏蔽了hadoop數(shù)據(jù)類型,取消了driver,將mapper和reducer轉(zhuǎn)化為transformer和joiner,業(yè)務邏輯更接近sql。相當程度的減少了代碼量,極大的降低了大數(shù)據(jù)編程的門檻,讓基層程序員通過簡單的學習即可掌握大數(shù)據(jù)的開發(fā)。
克服了hadoop mapreduce數(shù)據(jù)源單一的情況,比如在一個job內(nèi),input可以同時讀文件和來自不同集群的hbase。
遠程日志系統(tǒng),讓mapper和reducer的日志集中到driver的控制臺,極大減輕了并行多進程程序的調(diào)試難度。
克服了hadoop mapreduce編寫業(yè)務邏輯時,不容易區(qū)分數(shù)據(jù)來自哪個數(shù)據(jù)源的困難。接近了spark(或者sql)的水平。
天生的多線程執(zhí)行,即在mapper和reducer端都默認使用多線程來執(zhí)行業(yè)務邏輯。
對于多次迭代的任務,相連的兩個任務可以建立關聯(lián),下一個任務直接引用上一個任務的結果,使多次迭代任務的代碼結構變得清晰優(yōu)美。
以下會逐條說明
基本概念的小變化:
Source類代替了hadoop Input體系(format,split和reader)
Transformer代替了mapper
Joiner代替了Reducer
去掉了飽受詬病的Driver,改為內(nèi)置的實現(xiàn),現(xiàn)在完全不用操心了。
1. 基本上,屏蔽了hadoop的數(shù)據(jù)類型,使用純java類型
在原生的hadoop mapreduce開發(fā)中,使用org.apache.hadoop.io包下的各種hadoop數(shù)據(jù)類型,比如hadoop的Text類型,算法的編寫中一些轉(zhuǎn)換非常不方便。而在HST中一律使用java基本類型,完全屏蔽了hadoop類型體系。
比如在hbase作為source(Input)的時候,再也不用直接使用ImmutableBytesWritable和Result了,HST為你做了自動的轉(zhuǎn)換。
現(xiàn)在的mapper(改名叫Transformer了)風格是這樣的
public static class TransformerForHBase0 extends HBaseTransformer<Long>
…
現(xiàn)在map方法叫flatmap,看到?jīng)],已經(jīng)幫你自動轉(zhuǎn)成了string和map
public void flatMap(String key, Map<String, String> row,
Collector<Long> collector)
可閱讀xs.hadoop.iterated.IteratedUtil類中關于類型自動轉(zhuǎn)換的部分
2. 克服了hadoop mapreduce數(shù)據(jù)源單一的情況。比如在一個job內(nèi),數(shù)據(jù)源同時讀文件和hbase,這在原生的hadoop mapreduce是不可能做到的
以前訪問hbase,需要使用org.apache.hadoop.hbase.client.Scan和TableMapReduceUtil,現(xiàn)在完全改為與spark相似的方式。
現(xiàn)在的風格是這樣的:
Configuration conf0 = HBaseConfiguration.create();
conf0.set("hbase.zookeeper.property.clientPort", "2181");
conf0.set("hbase.zookeeper.quorum", "172.16.144.132,172.16.144.134,172.16.144.136");
conf0.set(TableInputFormat.INPUT_TABLE,"APPLICATION_JOBS");
conf0.set(TableInputFormat.SCAN_COLUMN_FAMILY,"cf");
conf0.set(TableInputFormat.SCAN_CACHEBLOCKS,"false");
conf0.set(TableInputFormat.SCAN_BATCHSIZE,"20000");
...其他hbase的Configuration,可以來自不同集群。
IteratedJob<Long> iJob = scheduler.createJob("testJob")
.from(Source.hBase(conf0), TransformerForHBase0.class)
.from(Source.hBase(conf1), TransformerForHBase1.class)
.from(Source.textFile("file:///home/cdh/0.txt"),Transformer0.class)
.join(JoinerHBase.class)
Hadoop中的input,現(xiàn)在完全由source類來代替。通過內(nèi)置的機制轉(zhuǎn)化為inputformat,inputsplit和reader。在HST的框架下,其實可以很容易的寫出諸如Source.dbms(),Source.kafka()以及Source.redis()方法。想想吧,在一個hadoop job中,你終于可以將任意數(shù)據(jù)源,例如來自不同集群的HBASE和來自數(shù)據(jù)庫的source進行join了,這是多么happy的事情啊!
3. 遠程日志系統(tǒng)。讓mapper和reducer的日志集中在driver進行顯示,極大減輕了了并行多進程程序的調(diào)試難度
各位都體驗過,job fail后到控制臺頁面,甚至ssh到計算節(jié)點去查看日志的痛苦了吧。對,hadoop原生的開發(fā),調(diào)試很痛苦的呢!
現(xiàn)在好了,有遠程日志系統(tǒng),可以在調(diào)試時將mapper和reducer的日志集中在driver上,錯誤和各種counter也會自動發(fā)送到driver上,并實時顯示在你的控制臺上。如果在eclipse中調(diào)試程序,就可以實現(xiàn)點擊console中的錯誤,直接跳到錯誤代碼行的功能嘍!
Ps:有人可能會問,如何在集群外使用eclipse調(diào)試一個job,卻可以以集群方式運行呢?這里不再贅述了,網(wǎng)上有很多答案的哦
4. 克服了hadoop mapreduce在join上,區(qū)分數(shù)據(jù)來自哪個數(shù)據(jù)源的困難,接近spark(或者sql)的水平
在上面給出示例中,大家都看到了,現(xiàn)在的mapper可以綁定input嘍!,也就是每個input都有自己獨立的mapper。正因為此,現(xiàn)在的input和mapper改名叫Source和Transformer。
那么,大家又要問了,在mapper中,我已經(jīng)可以輕松根據(jù)不同的數(shù)據(jù)輸入寫出不同的mapper了,那reducer中怎么辦,spark和sql都是很容易實現(xiàn)的哦?比如看人家sql
Select a.id,b.name from A a,B b where a.id = b.id
多么輕松愉悅啊!
在原生hadoop mapreduce中,在reducer中找出哪個數(shù)據(jù)對應來自哪個input可是一個令人抓狂的問題呢!
現(xiàn)在這個問題已經(jīng)被輕松解決嘍!看下面這個joiner,對應原生的reducer
public static class Joiner0 extends Joiner<Long, String, String>
…
Reduce方法改名叫join方法,是不是更貼近sql的概念呢?
public void join(Long key,RowHandler handler,Collector collector) throws Exception{
List<Object> row = handler.getSingleFieldRows(0);//對應索引為0的source
List<Object> row2 = handler.getSingleFieldRows(1);//對應第二個定義的source
注意上面兩句,可以按照數(shù)據(jù)源定義的索引來取出來自不同數(shù)據(jù)源join后的數(shù)據(jù)了,以后有時間可能會改成按照別名來取出,大家看源碼的時候,會發(fā)現(xiàn)別名這個部分的接口都寫好了,要不你來幫助實現(xiàn)了吧。
5. 天生的多線程執(zhí)行,即在mapper和reducer端都默認使用多線程來執(zhí)行業(yè)務邏輯。
看看源碼吧,HST框架是并發(fā)調(diào)用flatMap和join方法的,同時又不能改變系統(tǒng)調(diào)用reduce方法的順序(否則hadoop的辛苦排序可就白瞎了),這可不是一件容易的事呢!
看到這里,有的同學說了。你這個HST好是好,但你搞的自動轉(zhuǎn)換類型這個機制可能會把性能拉下來的。這個嗎,不得不承認,可能是會有一點影響。但在生產(chǎn)環(huán)境做的比對可以證明,影響太小了,基本忽略不計。
筆者在生產(chǎn)環(huán)境做了做了多次試驗,mapper改成多線程后性能并未有提高,特別是對一些業(yè)務簡單的job,增加Transformer中的并發(fā)級別效率可能還會下降。
很多同學喜歡在mapper中做所謂“mapper端的join”。這種方式,相信在HST中通過提高mapper的并發(fā)級別后會有更好的表現(xiàn)。
Reducer中的性能相對原生提升的空間還是蠻大的。大部分的mapreduce項目,都是mapper簡單而reducer復雜,HST采用并發(fā)執(zhí)行join的方式對提升reducer性能是超好的。
6. 對于多次迭代的任務,相連的兩個任務可以建立關聯(lián),在流程上的下一個job直接引用上一個job的結果,使多次迭代任務的代碼結構變得清晰優(yōu)美
雖然在最后才提到這一點,但這卻是我一開始想要寫HST原因。多次迭代的任務太麻煩了,上一個任務要寫在hdfs做存儲,下一個任務再取出使用,麻煩不麻煩。如果都由程序自動完成,豈不美哉!
在上一個任務里format一下
IteratedJob<Long> iJob = scheduler.createJob("testJob")
...//各種source定義
.format("f1","f2")
在第二個任務中,直接引用
IteratedJob<Long> stage2Job = scheduler.createJob("stage2Job")
.fromPrevious(iJob, Transformer2_0.class);
//Transformer2_0.class
public static class Transformer2_0 extends PreviousResultTransformer<Long>
...
public void flatMap(Long inputKey, String[] inputValues,Collector<Long> collector) {
String f1 = getFiledValue(inputValues, "f1");
String f2 = getFiledValue(inputValues, "f2");
看到?jīng)],就是這么簡單。
在最開始的計劃中,我還設計了使用redis隊列來緩沖前面job的結果,供后面的job作為輸入。這樣本來必須嚴格串行的job可以在一定程度上并發(fā)。另外還設計了子任務的并發(fā)調(diào)度,這都留給以后去實現(xiàn)吧。
7. 便捷的自定義參數(shù)傳遞。
有時候,在業(yè)務中需要作一些“開關變量”,在運行時動態(tài)傳入不同的值以實現(xiàn)不同的業(yè)務邏輯。這個問題HST框架其實也為你考慮到了。
Driver中的自定義參數(shù),source中的自定義參數(shù)都會以內(nèi)置的方式傳到transformer或joiner中去,方便程序員書寫業(yè)務。
查看transformer或joiner的源碼就會發(fā)現(xiàn):
getSourceParam(name)和getDriverParam(pIndex)方法,在計算節(jié)點輕松的得到在driver和source中設置的各層次級別的自定義參數(shù),爽吧!
8. 其他工具
HST提供的方便還不止以上這些,比如在工具類中還提供了兩行數(shù)據(jù)(map類型)直接join的方法。這些都留給你自己去發(fā)現(xiàn)并實踐吧!
https://github.com/jonenine/HST