https://github.com/jonenine/HST
雖然大數據的發展已經將近10個年頭了,hadoop技術仍然沒有過時,特別是一些低成本,入門級的小項目,使用hadoop還是蠻不錯的。而且,也不是每一個公司都有能力招聘和培養自己的spark人才。
我本人對于hadoop mapreduce是有一些意見的。hadoop mapreduce技術對于開發人員的友好度不高,程序難寫,調試困難,對于復雜的業務邏輯遠沒有spark得心應手。
2016年的春節前接到一個任務,要在一個沒有spark的平臺實現電力系統的一些統計分析算法,可選的技術只有hadoop mapreduce。受了這個刺激之后產生了一些奇思妙想,然后做了一些試驗,并最終形成HST---hadoop simplize toolkit,還真是無心載柳柳成蔭啊。
HST基本優點如下:
屏蔽了hadoop數據類型,取消了driver,將mapper和reducer轉化為transformer和joiner,業務邏輯更接近sql。相當程度的減少了代碼量,極大的降低了大數據編程的門檻,讓基層程序員通過簡單的學習即可掌握大數據的開發。
克服了hadoop mapreduce數據源單一的情況,比如在一個job內,input可以同時讀文件和來自不同集群的hbase。
遠程日志系統,讓mapper和reducer的日志集中到driver的控制臺,極大減輕了并行多進程程序的調試難度。
克服了hadoop mapreduce編寫業務邏輯時,不容易區分數據來自哪個數據源的困難。接近了spark(或者sql)的水平。
天生的多線程執行,即在mapper和reducer端都默認使用多線程來執行業務邏輯。
對于多次迭代的任務,相連的兩個任務可以建立關聯,下一個任務直接引用上一個任務的結果,使多次迭代任務的代碼結構變得清晰優美。
以下會逐條說明
基本概念的小變化:
Source類代替了hadoop Input體系(format,split和reader)
Transformer代替了mapper
Joiner代替了Reducer
去掉了飽受詬病的Driver,改為內置的實現,現在完全不用操心了。
1. 基本上,屏蔽了hadoop的數據類型,使用純java類型
在原生的hadoop mapreduce開發中,使用org.apache.hadoop.io包下的各種hadoop數據類型,比如hadoop的Text類型,算法的編寫中一些轉換非常不方便。而在HST中一律使用java基本類型,完全屏蔽了hadoop類型體系。
比如在hbase作為source(Input)的時候,再也不用直接使用ImmutableBytesWritable和Result了,HST為你做了自動的轉換。
現在的mapper(改名叫Transformer了)風格是這樣的
public static class TransformerForHBase0 extends HBaseTransformer<Long>
…
現在map方法叫flatmap,看到沒,已經幫你自動轉成了string和map
public void flatMap(String key, Map<String, String> row,
Collector<Long> collector)
可閱讀xs.hadoop.iterated.IteratedUtil類中關于類型自動轉換的部分
2. 克服了hadoop mapreduce數據源單一的情況。比如在一個job內,數據源同時讀文件和hbase,這在原生的hadoop mapreduce是不可能做到的
以前訪問hbase,需要使用org.apache.hadoop.hbase.client.Scan和TableMapReduceUtil,現在完全改為與spark相似的方式。
現在的風格是這樣的:
Configuration conf0 = HBaseConfiguration.create();
conf0.set("hbase.zookeeper.property.clientPort", "2181");
conf0.set("hbase.zookeeper.quorum", "172.16.144.132,172.16.144.134,172.16.144.136");
conf0.set(TableInputFormat.INPUT_TABLE,"APPLICATION_JOBS");
conf0.set(TableInputFormat.SCAN_COLUMN_FAMILY,"cf");
conf0.set(TableInputFormat.SCAN_CACHEBLOCKS,"false");
conf0.set(TableInputFormat.SCAN_BATCHSIZE,"20000");
...其他hbase的Configuration,可以來自不同集群。
IteratedJob<Long> iJob = scheduler.createJob("testJob")
.from(Source.hBase(conf0), TransformerForHBase0.class)
.from(Source.hBase(conf1), TransformerForHBase1.class)
.from(Source.textFile("file:///home/cdh/0.txt"),Transformer0.class)
.join(JoinerHBase.class)
Hadoop中的input,現在完全由source類來代替。通過內置的機制轉化為inputformat,inputsplit和reader。在HST的框架下,其實可以很容易的寫出諸如Source.dbms(),Source.kafka()以及Source.redis()方法。想想吧,在一個hadoop job中,你終于可以將任意數據源,例如來自不同集群的HBASE和來自數據庫的source進行join了,這是多么happy的事情?。?/span>
3. 遠程日志系統。讓mapper和reducer的日志集中在driver進行顯示,極大減輕了了并行多進程程序的調試難度
各位都體驗過,job fail后到控制臺頁面,甚至ssh到計算節點去查看日志的痛苦了吧。對,hadoop原生的開發,調試很痛苦的呢!
現在好了,有遠程日志系統,可以在調試時將mapper和reducer的日志集中在driver上,錯誤和各種counter也會自動發送到driver上,并實時顯示在你的控制臺上。如果在eclipse中調試程序,就可以實現點擊console中的錯誤,直接跳到錯誤代碼行的功能嘍!
Ps:有人可能會問,如何在集群外使用eclipse調試一個job,卻可以以集群方式運行呢?這里不再贅述了,網上有很多答案的哦
4. 克服了hadoop mapreduce在join上,區分數據來自哪個數據源的困難,接近spark(或者sql)的水平
在上面給出示例中,大家都看到了,現在的mapper可以綁定input嘍!,也就是每個input都有自己獨立的mapper。正因為此,現在的input和mapper改名叫Source和Transformer。
那么,大家又要問了,在mapper中,我已經可以輕松根據不同的數據輸入寫出不同的mapper了,那reducer中怎么辦,spark和sql都是很容易實現的哦?比如看人家sql
Select a.id,b.name from A a,B b where a.id = b.id
多么輕松愉悅啊!
在原生hadoop mapreduce中,在reducer中找出哪個數據對應來自哪個input可是一個令人抓狂的問題呢!
現在這個問題已經被輕松解決嘍!看下面這個joiner,對應原生的reducer
public static class Joiner0 extends Joiner<Long, String, String>
…
Reduce方法改名叫join方法,是不是更貼近sql的概念呢?
public void join(Long key,RowHandler handler,Collector collector) throws Exception{
List<Object> row = handler.getSingleFieldRows(0);//對應索引為0的source
List<Object> row2 = handler.getSingleFieldRows(1);//對應第二個定義的source
注意上面兩句,可以按照數據源定義的索引來取出來自不同數據源join后的數據了,以后有時間可能會改成按照別名來取出,大家看源碼的時候,會發現別名這個部分的接口都寫好了,要不你來幫助實現了吧。
5. 天生的多線程執行,即在mapper和reducer端都默認使用多線程來執行業務邏輯。
看看源碼吧,HST框架是并發調用flatMap和join方法的,同時又不能改變系統調用reduce方法的順序(否則hadoop的辛苦排序可就白瞎了),這可不是一件容易的事呢!
看到這里,有的同學說了。你這個HST好是好,但你搞的自動轉換類型這個機制可能會把性能拉下來的。這個嗎,不得不承認,可能是會有一點影響。但在生產環境做的比對可以證明,影響太小了,基本忽略不計。
筆者在生產環境做了做了多次試驗,mapper改成多線程后性能并未有提高,特別是對一些業務簡單的job,增加Transformer中的并發級別效率可能還會下降。
很多同學喜歡在mapper中做所謂“mapper端的join”。這種方式,相信在HST中通過提高mapper的并發級別后會有更好的表現。
Reducer中的性能相對原生提升的空間還是蠻大的。大部分的mapreduce項目,都是mapper簡單而reducer復雜,HST采用并發執行join的方式對提升reducer性能是超好的。
6. 對于多次迭代的任務,相連的兩個任務可以建立關聯,在流程上的下一個job直接引用上一個job的結果,使多次迭代任務的代碼結構變得清晰優美
雖然在最后才提到這一點,但這卻是我一開始想要寫HST原因。多次迭代的任務太麻煩了,上一個任務要寫在hdfs做存儲,下一個任務再取出使用,麻煩不麻煩。如果都由程序自動完成,豈不美哉!
在上一個任務里format一下
IteratedJob<Long> iJob = scheduler.createJob("testJob")
...//各種source定義
.format("f1","f2")
在第二個任務中,直接引用
IteratedJob<Long> stage2Job = scheduler.createJob("stage2Job")
.fromPrevious(iJob, Transformer2_0.class);
//Transformer2_0.class
public static class Transformer2_0 extends PreviousResultTransformer<Long>
...
public void flatMap(Long inputKey, String[] inputValues,Collector<Long> collector) {
String f1 = getFiledValue(inputValues, "f1");
String f2 = getFiledValue(inputValues, "f2");
看到沒,就是這么簡單。
在最開始的計劃中,我還設計了使用redis隊列來緩沖前面job的結果,供后面的job作為輸入。這樣本來必須嚴格串行的job可以在一定程度上并發。另外還設計了子任務的并發調度,這都留給以后去實現吧。
7. 便捷的自定義參數傳遞。
有時候,在業務中需要作一些“開關變量”,在運行時動態傳入不同的值以實現不同的業務邏輯。這個問題HST框架其實也為你考慮到了。
Driver中的自定義參數,source中的自定義參數都會以內置的方式傳到transformer或joiner中去,方便程序員書寫業務。
查看transformer或joiner的源碼就會發現:
getSourceParam(name)和getDriverParam(pIndex)方法,在計算節點輕松的得到在driver和source中設置的各層次級別的自定義參數,爽吧!
8. 其他工具
HST提供的方便還不止以上這些,比如在工具類中還提供了兩行數據(map類型)直接join的方法。這些都留給你自己去發現并實踐吧!
https://github.com/jonenine/HST