Posted on 2011-12-19 15:25
dennis 閱讀(14962)
評論(9) 編輯 收藏 所屬分類:
java 、
Hadoop與分布式
最近有朋友給我郵件問一些storm的問題,集中解答在這里。
一、我有一個數(shù)據(jù)文件,或者我有一個系統(tǒng)里面有數(shù)據(jù),怎么導入storm做計算?你需要實現(xiàn)一個Spout,Spout負責將數(shù)據(jù)emit到storm系統(tǒng)里,交給bolts計算。怎么實現(xiàn)spout可以參考官方的kestrel spout實現(xiàn):
https://github.com/nathanmarz/storm-kestrel如果你的數(shù)據(jù)源不支持事務(wù)性消費,那么就無法得到storm提供的可靠處理的保證,也沒必要實現(xiàn)ISpout接口中的ack和fail方法。
二、Storm為了保證tuple的可靠處理,需要保存tuple信息,這會不會導致內(nèi)存OOM?Storm為了保證tuple的可靠處理,acker會保存該節(jié)點創(chuàng)建的tuple id的xor值,這稱為ack value,那么每ack一次,就將tuple id和ack value做異或(xor)。當所有產(chǎn)生的tuple都被ack的時候, ack value一定為0。這是個很簡單的策略,對于每一個tuple也只要占用約20個字節(jié)的內(nèi)存。對于100萬tuple,也才20M左右。關(guān)于可靠處理看這個:
https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing三、Storm計算后的結(jié)果保存在哪里?可以保存在外部存儲嗎?Storm不處理計算結(jié)果的保存,這是應(yīng)用代碼需要負責的事情,如果數(shù)據(jù)不大,你可以簡單地保存在內(nèi)存里,也可以每次都更新數(shù)據(jù)庫,也可以采用NoSQL存儲。storm并沒有像s4那樣提供一個Persist API,根據(jù)時間或者容量來做存儲輸出。這部分事情完全交給用戶。
數(shù)據(jù)存儲之后的展現(xiàn),也是你需要自己處理的,storm UI只提供對topology的監(jiān)控和統(tǒng)計。
四、Storm怎么處理重復的tuple?因為Storm要保證tuple的可靠處理,當tuple處理失敗或者超時的時候,spout會fail并重新發(fā)送該tuple,那么就會有tuple重復計算的問題。這個問題是很難解決的,storm也沒有提供機制幫助你解決。一些可行的策略:
(1)不處理,這也算是種策略。因為實時計算通常并不要求很高的精確度,后續(xù)的批處理計算會更正實時計算的誤差。
(2)使用第三方集中存儲來過濾,比如利用mysql,memcached或者redis根據(jù)邏輯主鍵來去重。
(3)使用bloom filter做過濾,簡單高效。
五、Storm的動態(tài)增刪節(jié)點我在storm和s4里比較里談到的動態(tài)增刪節(jié)點,是指storm可以動態(tài)地添加和減少supervisor節(jié)點。對于減少節(jié)點來說,被移除的supervisor上的worker會被nimbus重新負載均衡到其他supervisor節(jié)點上。在storm 0.6.1以前的版本,增加supervisor節(jié)點不會影響現(xiàn)有的topology,也就是現(xiàn)有的topology不會重新負載均衡到新的節(jié)點上,在擴展集群的時候很不方便,需要重新提交topology。因此我在storm的郵件列表里提了這個問題,storm的開發(fā)者nathanmarz創(chuàng)建了一個issue 54并在0.6.1提供了rebalance命令來讓正在運行的topology重新負載均衡,具體見:
https://github.com/nathanmarz/storm/issues/54和0.6.1的變更:
http://groups.google.com/group/storm-user/browse_thread/thread/24a8fce0b2e53246storm并不提供機制來動態(tài)調(diào)整worker和task數(shù)目。
六、Storm UI里spout統(tǒng)計的complete latency的具體含義是什么?為什么emit的數(shù)目會是acked的兩倍?這個事實上是storm郵件列表里的一個問題。Storm作者marz的解答:
The complete latency is the time from the spout emitting a tuple to that
tuple being acked on the spout. So it tracks the time for the whole tuple
tree to be processed.
If you dive into the spout component in the UI, you'll see that a lot of
the emitted/transferred is on the __ack* stream. This is the spout
communicating with the ackers which take care of tracking the tuple trees.
簡單地說,complete latency表示了tuple從emit到被acked經(jīng)過的時間,可以認為是tuple以及該tuple的后續(xù)子孫(形成一棵樹)整個處理時間。其次spout的emit和transfered還統(tǒng)計了spout和acker之間內(nèi)部的通信信息,比如對于可靠處理的spout來說,會在emit的時候同時發(fā)送一個_ack_init給acker,記錄tuple id到task id的映射,以便ack的時候能找到正確的acker task。