Posted on 2011-12-19 15:25
dennis 閱讀(14963)
評(píng)論(9) 編輯 收藏 所屬分類:
java 、
Hadoop與分布式
最近有朋友給我郵件問一些storm的問題,集中解答在這里。
一、我有一個(gè)數(shù)據(jù)文件,或者我有一個(gè)系統(tǒng)里面有數(shù)據(jù),怎么導(dǎo)入storm做計(jì)算?你需要實(shí)現(xiàn)一個(gè)Spout,Spout負(fù)責(zé)將數(shù)據(jù)emit到storm系統(tǒng)里,交給bolts計(jì)算。怎么實(shí)現(xiàn)spout可以參考官方的kestrel spout實(shí)現(xiàn):
https://github.com/nathanmarz/storm-kestrel如果你的數(shù)據(jù)源不支持事務(wù)性消費(fèi),那么就無法得到storm提供的可靠處理的保證,也沒必要實(shí)現(xiàn)ISpout接口中的ack和fail方法。
二、Storm為了保證tuple的可靠處理,需要保存tuple信息,這會(huì)不會(huì)導(dǎo)致內(nèi)存OOM?Storm為了保證tuple的可靠處理,acker會(huì)保存該節(jié)點(diǎn)創(chuàng)建的tuple id的xor值,這稱為ack value,那么每ack一次,就將tuple id和ack value做異或(xor)。當(dāng)所有產(chǎn)生的tuple都被ack的時(shí)候, ack value一定為0。這是個(gè)很簡(jiǎn)單的策略,對(duì)于每一個(gè)tuple也只要占用約20個(gè)字節(jié)的內(nèi)存。對(duì)于100萬tuple,也才20M左右。關(guān)于可靠處理看這個(gè):
https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing三、Storm計(jì)算后的結(jié)果保存在哪里?可以保存在外部存儲(chǔ)嗎?Storm不處理計(jì)算結(jié)果的保存,這是應(yīng)用代碼需要負(fù)責(zé)的事情,如果數(shù)據(jù)不大,你可以簡(jiǎn)單地保存在內(nèi)存里,也可以每次都更新數(shù)據(jù)庫,也可以采用NoSQL存儲(chǔ)。storm并沒有像s4那樣提供一個(gè)Persist API,根據(jù)時(shí)間或者容量來做存儲(chǔ)輸出。這部分事情完全交給用戶。
數(shù)據(jù)存儲(chǔ)之后的展現(xiàn),也是你需要自己處理的,storm UI只提供對(duì)topology的監(jiān)控和統(tǒng)計(jì)。
四、Storm怎么處理重復(fù)的tuple?因?yàn)镾torm要保證tuple的可靠處理,當(dāng)tuple處理失敗或者超時(shí)的時(shí)候,spout會(huì)fail并重新發(fā)送該tuple,那么就會(huì)有tuple重復(fù)計(jì)算的問題。這個(gè)問題是很難解決的,storm也沒有提供機(jī)制幫助你解決。一些可行的策略:
(1)不處理,這也算是種策略。因?yàn)閷?shí)時(shí)計(jì)算通常并不要求很高的精確度,后續(xù)的批處理計(jì)算會(huì)更正實(shí)時(shí)計(jì)算的誤差。
(2)使用第三方集中存儲(chǔ)來過濾,比如利用mysql,memcached或者redis根據(jù)邏輯主鍵來去重。
(3)使用bloom filter做過濾,簡(jiǎn)單高效。
五、Storm的動(dòng)態(tài)增刪節(jié)點(diǎn)我在storm和s4里比較里談到的動(dòng)態(tài)增刪節(jié)點(diǎn),是指storm可以動(dòng)態(tài)地添加和減少supervisor節(jié)點(diǎn)。對(duì)于減少節(jié)點(diǎn)來說,被移除的supervisor上的worker會(huì)被nimbus重新負(fù)載均衡到其他supervisor節(jié)點(diǎn)上。在storm 0.6.1以前的版本,增加supervisor節(jié)點(diǎn)不會(huì)影響現(xiàn)有的topology,也就是現(xiàn)有的topology不會(huì)重新負(fù)載均衡到新的節(jié)點(diǎn)上,在擴(kuò)展集群的時(shí)候很不方便,需要重新提交topology。因此我在storm的郵件列表里提了這個(gè)問題,storm的開發(fā)者nathanmarz創(chuàng)建了一個(gè)issue 54并在0.6.1提供了rebalance命令來讓正在運(yùn)行的topology重新負(fù)載均衡,具體見:
https://github.com/nathanmarz/storm/issues/54和0.6.1的變更:
http://groups.google.com/group/storm-user/browse_thread/thread/24a8fce0b2e53246storm并不提供機(jī)制來動(dòng)態(tài)調(diào)整worker和task數(shù)目。
六、Storm UI里spout統(tǒng)計(jì)的complete latency的具體含義是什么?為什么emit的數(shù)目會(huì)是acked的兩倍?這個(gè)事實(shí)上是storm郵件列表里的一個(gè)問題。Storm作者marz的解答:
The complete latency is the time from the spout emitting a tuple to that
tuple being acked on the spout. So it tracks the time for the whole tuple
tree to be processed.
If you dive into the spout component in the UI, you'll see that a lot of
the emitted/transferred is on the __ack* stream. This is the spout
communicating with the ackers which take care of tracking the tuple trees.
簡(jiǎn)單地說,complete latency表示了tuple從emit到被acked經(jīng)過的時(shí)間,可以認(rèn)為是tuple以及該tuple的后續(xù)子孫(形成一棵樹)整個(gè)處理時(shí)間。其次spout的emit和transfered還統(tǒng)計(jì)了spout和acker之間內(nèi)部的通信信息,比如對(duì)于可靠處理的spout來說,會(huì)在emit的時(shí)候同時(shí)發(fā)送一個(gè)_ack_init給acker,記錄tuple id到task id的映射,以便ack的時(shí)候能找到正確的acker task。