移植jQuery deferred到java,基于java的promise編程模型
很多語言都支持promise編程模型,像是scala中promise類和jquery(javascript)中的deferred對象等,在java中好像缺少相關實現。筆者不得以,只能自己動手弄了一個。最后選擇將jquery中的deferred對象移植到java中來的方案。目前已經應用在企業級項目的高性能服務器和android客戶端等項目中。
Promise編程模型的概念這里也不再贅述,大家自己上網查找即可。這種編程模型主要解決的問題就是“同步調用變異步的問題”,通常解決異步調用的方式是使用“回調”。但普通回調的使用在代碼書寫,返回值傳遞和“異步方法編排上”非常的不方便。所以才會有Promise模型的誕生。
這次會介紹java版的deferred對象的使用方法,以及用jquery版之間的變化和改進。目前開放的版本是基于線程池的版本,正在開發基于akka的版本。在jquery的實現中,因為javascript是單線程的,所以不用考慮線程同步的問題。在java線程池的版的deferred里,基于多線程環境做了很多測試,保證了線程安全及可靠性。
一. 基本調用形式
final Deferred def = new Deferred (App. executor);
執行某個異步調用,比如某個基于網絡的異步服務
callService(new Response(){
public void onMessage(Object message){
def.resolve(message);
}
Public void onFail(Exception e){
def.reject(e);
}
});
你可以在構造Deferred 對象后的任意時候,使用def的then方法。比如
def.then(new Reply(){
public Object done(Object d) {
System.out.println("response:"+d);
return d;
}
public void fail(Object f) {
System.out.println("error:"+f);
}
});
一個經常遇到的場景是callService后將def作為參數傳遞到其他方法,在其他方法內部再決定def要綁定什么樣的后續動作,也就是綁定什么樣的then。
注意then方法的定義public Object done(Object d),在實際使用中done通常是以“處理鏈”的方式來使用的,即你會看到def.then().then().then()…這樣的方式,每一個then的done方法接收的參數都是其上一個then的done方法的返回值。通常作為參數傳遞給某個方法的Deferred上面已經綁定了一些默認的then對象,來處理一些必要的步驟。比如對接收報文的初步解碼。
注意同在Reply接口中fail方法是沒有返回值的,一旦異步處理鏈上的某個Deferred被reject,其本身及后面所有的Deferred綁定的then都會被觸發fail方法。這保證了整個業務編排上或是你精心設計的算法編排上任意一個環節,無論如何都會得到響應,這也是Promise模型關于異常的最重要的處理方式。
Promise編程模型本身是強健的,但異步服務卻不是總能得到響應。在實際應用中,每一個作為計算或業務環節的Deferred都應該被定時輪詢,以保證在異步服務徹底得不到響應的時候(比如你執行了一個數據庫查詢,但過了很長很長時間仍沒有得到回應),可以給Deferred對象reject一個超時錯誤。
響應處理對象then中方法done和fail都是不允許拋出任何異常的,特別是done方法,如果你的算法依賴異常,請在done中加上try…catch,并將異常傳換成下一個then可以理解的信息,以便這個Deferred處理鏈中可以正常執行下去。
二. pipe到另外一個異步處理流程上去
假如你有如下的業務場景,你需要順序調用三個異步的webservice服務來得到最終的返回結果,其中沒個webservice的入參都和上一個的異步返回結果相關。(注意,異步的webservice是調用之后,服務端立刻返回,服務端處理完成后再主動訪問剛才的請求方返回結果的方式)如果將這種webservice調用封裝成同步方法無疑在編程上是非常方便的,可以使用我們平常寫程序時順序的書寫方式,比如
reval1 = callwebservice1(param0)
reval2 = callwebservice2(reval1)
reval3 = callwebservice3(reval2)
方便的同時卻犧牲了性能。調用線程要在callwebservice方法內阻塞,以等待異步返回。這樣的編程方法無法滿足高性能及高并發的需要。那么有沒有既能類似于平常寫程序時順序的書寫方式又能滿足異步無阻塞的需要呢,這就是Promise編程模型本身要解決的最大問題。
通常解決這種問題的方式是使用pipe,pipe這個方法名稱的由來應該是來自于linux shell的管道符,即“|”
使用Deferred對象的解決方案類似于如下:
Deferred.resolvedDeferred(App.executor,param0).pipe(new AsyncRequest2(){
public void apply(Object param0,final Deferred newDefered) throws Exception{
asyncCallwebservice1(param0).onResponse(new Response(){
public void onMessage(String message){
newDefered.resolve(message);
}
});
}
}).pipe(new AsyncRequest2(){
public void apply(Object reval1,final Deferred newDefered) throws Exception{
asyncCallwebservice2(reval1).onResponse(new Response(){
public void onMessage(String message){
newDefered.resolve(message);
}
});
}
}).pipe(new AsyncRequest2(){
public void apply(Object reval2,final Deferred newDefered) throws Exception{
asyncCallwebservice3(reval3).onResponse(new Response(){
public void onMessage(String message){
newDefered.resolve(message);
}
});
}
}).then(new new Reply(){
public Object done(Object d) {
//在這里消費最終結果
return d;
}
public void fail(Object f) {
}
});
使用Deferred對象提供的方案好處就是,所有的調用都是異步的,上面這一連串代碼立刻就會返回。所有的業務編排會按照書寫順序在線程池中的線程里被調用,你也不必擔心返回值結果和參數傳遞過程中的線程安全問題,框架在關鍵位置都做了同步,也做了相當多的測試用于驗證。
可以看出,對于異步方法調用而言,比較難以解決的問題是異步算法的編排問題。Deferred對象為異步算法提供了很好的解決方案。
相較于AsyncRequest2類還有一個AsyncRequest1類,接口如下:
public interface AsyncRequest1<R> {
public Deferred apply(R result) throws Exception;
}
這個類要求在在apply方法中要自己創建Deferred對象。
三. 一些小改進
相較于傳統promise編程模型,在java多線程環境下做了一些小升級。這里主要介紹synchronize方法
Synchronize方法簽名如下:
Deferred synchronize(ExecutorService executor,Deferred... deferreds)
實際上,synchronize方法將眾多的Deferred對象的完成狀態同歸集到一個唯一的Deferred對象上去,即如果所有的Deferred對象參數都resolved了,作為最終結果的Deferred也resolve,如果眾多的Deferred對象參數有一個reject了,最終的那個Deferred也會立即reject(其他參數的狀態都舍棄)。
這個方法一般用于多個并行流程最終狀態的“歸并”中。
除了synchronize,框架還提供一些傳統promise編程模型沒有的改進,比如pipe4fail和source等。
四.在android項目中的應用
(略)
https://github.com/jonenine/javaDeferred
https://github.com/jonenine/HST
雖然大數據的發展已經將近10個年頭了,hadoop技術仍然沒有過時,特別是一些低成本,入門級的小項目,使用hadoop還是蠻不錯的。而且,也不是每一個公司都有能力招聘和培養自己的spark人才。
我本人對于hadoop mapreduce是有一些意見的。hadoop mapreduce技術對于開發人員的友好度不高,程序難寫,調試困難,對于復雜的業務邏輯遠沒有spark得心應手。
2016年的春節前接到一個任務,要在一個沒有spark的平臺實現電力系統的一些統計分析算法,可選的技術只有hadoop mapreduce。受了這個刺激之后產生了一些奇思妙想,然后做了一些試驗,并最終形成HST---hadoop simplize toolkit,還真是無心載柳柳成蔭啊。
HST基本優點如下:
屏蔽了hadoop數據類型,取消了driver,將mapper和reducer轉化為transformer和joiner,業務邏輯更接近sql。相當程度的減少了代碼量,極大的降低了大數據編程的門檻,讓基層程序員通過簡單的學習即可掌握大數據的開發。
克服了hadoop mapreduce數據源單一的情況,比如在一個job內,input可以同時讀文件和來自不同集群的hbase。
遠程日志系統,讓mapper和reducer的日志集中到driver的控制臺,極大減輕了并行多進程程序的調試難度。
克服了hadoop mapreduce編寫業務邏輯時,不容易區分數據來自哪個數據源的困難。接近了spark(或者sql)的水平。
天生的多線程執行,即在mapper和reducer端都默認使用多線程來執行業務邏輯。
對于多次迭代的任務,相連的兩個任務可以建立關聯,下一個任務直接引用上一個任務的結果,使多次迭代任務的代碼結構變得清晰優美。
以下會逐條說明
基本概念的小變化:
Source類代替了hadoop Input體系(format,split和reader)
Transformer代替了mapper
Joiner代替了Reducer
去掉了飽受詬病的Driver,改為內置的實現,現在完全不用操心了。
1. 基本上,屏蔽了hadoop的數據類型,使用純java類型
在原生的hadoop mapreduce開發中,使用org.apache.hadoop.io包下的各種hadoop數據類型,比如hadoop的Text類型,算法的編寫中一些轉換非常不方便。而在HST中一律使用java基本類型,完全屏蔽了hadoop類型體系。
比如在hbase作為source(Input)的時候,再也不用直接使用ImmutableBytesWritable和Result了,HST為你做了自動的轉換。
現在的mapper(改名叫Transformer了)風格是這樣的
public static class TransformerForHBase0 extends HBaseTransformer<Long>
…
現在map方法叫flatmap,看到沒,已經幫你自動轉成了string和map
public void flatMap(String key, Map<String, String> row,
Collector<Long> collector)
可閱讀xs.hadoop.iterated.IteratedUtil類中關于類型自動轉換的部分
2. 克服了hadoop mapreduce數據源單一的情況。比如在一個job內,數據源同時讀文件和hbase,這在原生的hadoop mapreduce是不可能做到的
以前訪問hbase,需要使用org.apache.hadoop.hbase.client.Scan和TableMapReduceUtil,現在完全改為與spark相似的方式。
現在的風格是這樣的:
Configuration conf0 = HBaseConfiguration.create();
conf0.set("hbase.zookeeper.property.clientPort", "2181");
conf0.set("hbase.zookeeper.quorum", "172.16.144.132,172.16.144.134,172.16.144.136");
conf0.set(TableInputFormat.INPUT_TABLE,"APPLICATION_JOBS");
conf0.set(TableInputFormat.SCAN_COLUMN_FAMILY,"cf");
conf0.set(TableInputFormat.SCAN_CACHEBLOCKS,"false");
conf0.set(TableInputFormat.SCAN_BATCHSIZE,"20000");
...其他hbase的Configuration,可以來自不同集群。
IteratedJob<Long> iJob = scheduler.createJob("testJob")
.from(Source.hBase(conf0), TransformerForHBase0.class)
.from(Source.hBase(conf1), TransformerForHBase1.class)
.from(Source.textFile("file:///home/cdh/0.txt"),Transformer0.class)
.join(JoinerHBase.class)
Hadoop中的input,現在完全由source類來代替。通過內置的機制轉化為inputformat,inputsplit和reader。在HST的框架下,其實可以很容易的寫出諸如Source.dbms(),Source.kafka()以及Source.redis()方法。想想吧,在一個hadoop job中,你終于可以將任意數據源,例如來自不同集群的HBASE和來自數據庫的source進行join了,這是多么happy的事情啊!
3. 遠程日志系統。讓mapper和reducer的日志集中在driver進行顯示,極大減輕了了并行多進程程序的調試難度
各位都體驗過,job fail后到控制臺頁面,甚至ssh到計算節點去查看日志的痛苦了吧。對,hadoop原生的開發,調試很痛苦的呢!
現在好了,有遠程日志系統,可以在調試時將mapper和reducer的日志集中在driver上,錯誤和各種counter也會自動發送到driver上,并實時顯示在你的控制臺上。如果在eclipse中調試程序,就可以實現點擊console中的錯誤,直接跳到錯誤代碼行的功能嘍!
Ps:有人可能會問,如何在集群外使用eclipse調試一個job,卻可以以集群方式運行呢?這里不再贅述了,網上有很多答案的哦
4. 克服了hadoop mapreduce在join上,區分數據來自哪個數據源的困難,接近spark(或者sql)的水平
在上面給出示例中,大家都看到了,現在的mapper可以綁定input嘍!,也就是每個input都有自己獨立的mapper。正因為此,現在的input和mapper改名叫Source和Transformer。
那么,大家又要問了,在mapper中,我已經可以輕松根據不同的數據輸入寫出不同的mapper了,那reducer中怎么辦,spark和sql都是很容易實現的哦?比如看人家sql
Select a.id,b.name from A a,B b where a.id = b.id
多么輕松愉悅啊!
在原生hadoop mapreduce中,在reducer中找出哪個數據對應來自哪個input可是一個令人抓狂的問題呢!
現在這個問題已經被輕松解決嘍!看下面這個joiner,對應原生的reducer
public static class Joiner0 extends Joiner<Long, String, String>
…
Reduce方法改名叫join方法,是不是更貼近sql的概念呢?
public void join(Long key,RowHandler handler,Collector collector) throws Exception{
List<Object> row = handler.getSingleFieldRows(0);//對應索引為0的source
List<Object> row2 = handler.getSingleFieldRows(1);//對應第二個定義的source
注意上面兩句,可以按照數據源定義的索引來取出來自不同數據源join后的數據了,以后有時間可能會改成按照別名來取出,大家看源碼的時候,會發現別名這個部分的接口都寫好了,要不你來幫助實現了吧。
5. 天生的多線程執行,即在mapper和reducer端都默認使用多線程來執行業務邏輯。
看看源碼吧,HST框架是并發調用flatMap和join方法的,同時又不能改變系統調用reduce方法的順序(否則hadoop的辛苦排序可就白瞎了),這可不是一件容易的事呢!
看到這里,有的同學說了。你這個HST好是好,但你搞的自動轉換類型這個機制可能會把性能拉下來的。這個嗎,不得不承認,可能是會有一點影響。但在生產環境做的比對可以證明,影響太小了,基本忽略不計。
筆者在生產環境做了做了多次試驗,mapper改成多線程后性能并未有提高,特別是對一些業務簡單的job,增加Transformer中的并發級別效率可能還會下降。
很多同學喜歡在mapper中做所謂“mapper端的join”。這種方式,相信在HST中通過提高mapper的并發級別后會有更好的表現。
Reducer中的性能相對原生提升的空間還是蠻大的。大部分的mapreduce項目,都是mapper簡單而reducer復雜,HST采用并發執行join的方式對提升reducer性能是超好的。
6. 對于多次迭代的任務,相連的兩個任務可以建立關聯,在流程上的下一個job直接引用上一個job的結果,使多次迭代任務的代碼結構變得清晰優美
雖然在最后才提到這一點,但這卻是我一開始想要寫HST原因。多次迭代的任務太麻煩了,上一個任務要寫在hdfs做存儲,下一個任務再取出使用,麻煩不麻煩。如果都由程序自動完成,豈不美哉!
在上一個任務里format一下
IteratedJob<Long> iJob = scheduler.createJob("testJob")
...//各種source定義
.format("f1","f2")
在第二個任務中,直接引用
IteratedJob<Long> stage2Job = scheduler.createJob("stage2Job")
.fromPrevious(iJob, Transformer2_0.class);
//Transformer2_0.class
public static class Transformer2_0 extends PreviousResultTransformer<Long>
...
public void flatMap(Long inputKey, String[] inputValues,Collector<Long> collector) {
String f1 = getFiledValue(inputValues, "f1");
String f2 = getFiledValue(inputValues, "f2");
看到沒,就是這么簡單。
在最開始的計劃中,我還設計了使用redis隊列來緩沖前面job的結果,供后面的job作為輸入。這樣本來必須嚴格串行的job可以在一定程度上并發。另外還設計了子任務的并發調度,這都留給以后去實現吧。
7. 便捷的自定義參數傳遞。
有時候,在業務中需要作一些“開關變量”,在運行時動態傳入不同的值以實現不同的業務邏輯。這個問題HST框架其實也為你考慮到了。
Driver中的自定義參數,source中的自定義參數都會以內置的方式傳到transformer或joiner中去,方便程序員書寫業務。
查看transformer或joiner的源碼就會發現:
getSourceParam(name)和getDriverParam(pIndex)方法,在計算節點輕松的得到在driver和source中設置的各層次級別的自定義參數,爽吧!
8. 其他工具
HST提供的方便還不止以上這些,比如在工具類中還提供了兩行數據(map類型)直接join的方法。這些都留給你自己去發現并實踐吧!
https://github.com/jonenine/HST