<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    莊周夢(mèng)蝶

    生活、程序、未來(lái)
       :: 首頁(yè) ::  ::  :: 聚合  :: 管理

    Storm源碼淺析之topology的提交

    Posted on 2011-12-01 21:48 dennis 閱讀(15219) 評(píng)論(10)  編輯  收藏 所屬分類: java源碼解讀Hadoop與分布式Clojure
        原文:http://www.tkk7.com/killme2008/archive/2011/11/17/364112.html
        作者:dennis (killme2008@gmail.com)
        轉(zhuǎn)載請(qǐng)注明出處。

        最近一直在讀twitter開(kāi)源的這個(gè)分布式流計(jì)算框架——storm的源碼,還是有必要記錄下一些比較有意思的地方。我按照storm的主要概念進(jìn)行組織,并且只分析我關(guān)注的東西,因此稱之為淺析。       

    一、介紹
        Storm的開(kāi)發(fā)語(yǔ)言主要是Java和Clojure,其中Java定義骨架,而Clojure編寫(xiě)核心邏輯。源碼統(tǒng)計(jì)結(jié)果:
         180 text files.
         
    177 unique files.                                          
           
    7 files ignored.

    http:
    //cloc.sourceforge.net v 1.55  T=1.0 s (171.0 files/s, 46869.0 lines/s)
    -------------------------------------------------------------------------------
    Language                     files          blank        comment           code
    -------------------------------------------------------------------------------
    Java                           
    125           5010           2414          25661
    Lisp                            
    33            732            283           4871
    Python                           
    7            742            433           4675
    CSS                              
    1             12             45           1837
    ruby                             
    2             22              0            104
    Bourne Shell                     
    1              0              0              6
    Javascript                       
    2              1             15              6
    -------------------------------------------------------------------------------
    SUM:                           
    171           6519           3190          37160
    -------------------------------------------------------------------------------

        Java代碼25000多行,而Clojure(Lisp)只有4871行,說(shuō)語(yǔ)言不重要再次證明是扯淡。
            
    二、Topology和Nimbus       
        Topology是storm的核心理念,將spout和bolt組織成一個(gè)topology,運(yùn)行在storm集群里,完成實(shí)時(shí)分析和計(jì)算的任務(wù)。這里我主要想介紹下topology部署到storm集群的大概過(guò)程。提交一個(gè)topology任務(wù)到Storm集群是通過(guò)StormSubmitter.submitTopology方法提交:
    StormSubmitter.submitTopology(name, conf, builder.createTopology());
        我們將topology打成jar包后,利用bin/storm這個(gè)python腳本,執(zhí)行如下命令:
    bin/storm jar xxxx.jar com.taobao.MyTopology args
        將jar包提交給storm集群。storm腳本會(huì)啟動(dòng)JVM執(zhí)行Topology的main方法,執(zhí)行submitTopology的過(guò)程。而submitTopology會(huì)將jar文件上傳到nimbus,上傳是通過(guò)socket傳輸。在storm這個(gè)python腳本的jar方法里可以看到:
    def jar(jarfile, klass, *args):                                                                                                                               
       exec_storm_class(                                                                                                                                          
            klass,                                                                                                                                                
            jvmtype
    ="-client",                                                                                                                                    
            extrajars
    =[jarfile, CONF_DIR, STORM_DIR + "/bin"],                                                                                                    
            args
    =args,                                                                                                                                            
            prefix
    ="export STORM_JAR=" + jarfile + ";")
         將jar文件的地址設(shè)置為環(huán)境變量STORM_JAR,這個(gè)環(huán)境變量在執(zhí)行submitTopology的時(shí)候用到:
    //StormSubmitter.java 
    private static void submitJar(Map conf) {
            
    if(submittedJar==null) {
                LOG.info(
    "Jar not uploaded to master yet. Submitting jar");
                String localJar 
    = System.getenv("STORM_JAR");
                submittedJar 
    = submitJar(conf, localJar);
            } 
    else {
                LOG.info(
    "Jar already uploaded to master. Not submitting jar.");
            }
        }
        通過(guò)環(huán)境變量找到j(luò)ar包的地址,然后上傳。利用環(huán)境變量傳參是個(gè)小技巧。

        其次,nimbus在接收到j(luò)ar文件后,存放到數(shù)據(jù)目錄的inbox目錄,nimbus數(shù)據(jù)目錄的結(jié)構(gòu)
    -nimbus
         
    -inbox
             
    -stormjar-57f1d694-2865-4b3b-8a7c-99104fc0aea3.jar
             
    -stormjar-76b4e316-b430-4215-9e26-4f33ba4ee520.jar

         
    -stormdist
            
    -storm-id
               
    -stormjar.jar
               
    -stormconf.ser
               
    -stormcode.ser
         其中inbox用于存放提交的jar文件,每個(gè)jar文件都重命名為stormjar加上一個(gè)32位的UUID。而stormdist存放的是啟動(dòng)topology后生成的文件,每個(gè)topology都分配一個(gè)唯一的id,ID的規(guī)則是“name-計(jì)數(shù)-時(shí)間戳”。啟動(dòng)后的topology的jar文件名命名為storm.jar ,而它的配置經(jīng)過(guò)java序列化后存放在stormconf.ser文件,而stormcode.ser是將topology本身序列化后存放的文件。這些文件在部署的時(shí)候,supervisor會(huì)從這個(gè)目錄下載這些文件,然后在supervisor本地執(zhí)行這些代碼。
        進(jìn)入重點(diǎn),topology任務(wù)的分配過(guò)程(zookeeper路徑說(shuō)明忽略root):
    1.在zookeeper上創(chuàng)建/taskheartbeats/{storm id} 路徑,用于任務(wù)的心跳檢測(cè)。storm對(duì)zookeeper的一個(gè)重要應(yīng)用就是利用zk的臨時(shí)節(jié)點(diǎn)做存活檢測(cè)。task將定時(shí)刷新節(jié)點(diǎn)的時(shí)間戳,然后nimbus會(huì)檢測(cè)這個(gè)時(shí)間戳是否超過(guò)timeout設(shè)置。
    2.從topology中獲取bolts,spouts設(shè)置的并行數(shù)目以及全局配置的最大并行數(shù),然后產(chǎn)生task id列表,如[1 2 3 4]
    3.在zookeeper上創(chuàng)建/tasks/{strom id}/{task id}路徑,并存儲(chǔ)task信息
    4.開(kāi)始分配任務(wù)(內(nèi)部稱為assignment), 具體步驟:
     (1)從zk上獲得已有的assignment(新的toplogy當(dāng)然沒(méi)有了)
     (2)查找所有可用的slot,所謂slot就是可用的worker,在所有supervisor上配置的多個(gè)worker的端口。
     (3)將任務(wù)均勻地分配給可用的worker,這里有兩種情況:
     (a)task數(shù)目比worker多,例如task是[1 2 3 4],可用的slot只有[host1:port1 host2:port1],那么最終是這樣分配
    {1: [host1:port1] 2 : [host2:port1]
             
    3 : [host1:port1] 4 : [host2:port1]}
    ,可以看到任務(wù)平均地分配在兩個(gè)worker上。
    (b)如果task數(shù)目比worker少,例如task是[1 2],而worker有[host1:port1 host1:port2 host2:port1 host2:port2],那么首先會(huì)將woker排序,將不同host間隔排列,保證task不會(huì)全部分配到同一個(gè)worker上,也就是將worker排列成
    [host1:port1 host2:port1 host1:port2 host2:port2]
    ,然后分配任務(wù)為
    {1: host1:port1 , 2 : host2:port2}

    (4)記錄啟動(dòng)時(shí)間
    (5)判斷現(xiàn)有的assignment是否跟重新分配的assignment相同,如果相同,不需要變更,否則更新assignment到zookeeper的/assignments/{storm id}上。
    5.啟動(dòng)topology,所謂啟動(dòng),只是將zookeeper上/storms/{storm id}對(duì)應(yīng)的數(shù)據(jù)里的active設(shè)置為true。
    6.nimbus會(huì)檢查task的心跳,如果發(fā)現(xiàn)task心跳超過(guò)超時(shí)時(shí)間,那么會(huì)重新跳到第4步做re-assignment。

    評(píng)論

    # re: Storm源碼淺析之topology的提交  回復(fù)  更多評(píng)論   

    2011-12-05 11:11 by
    Java代碼25000多行,而Clojure(Lisp)只有4871行5563。。。不明白是什么意思?是Storm中既有Java,又有Clojure?還是Storm有Java和Clojure兩個(gè)版本?

    # re: Storm源碼淺析之topology的提交  回復(fù)  更多評(píng)論   

    2011-12-14 23:56 by fiw
    非常感謝您的講解,給了我很大的幫助。有一個(gè)問(wèn)題,Storm最后處理完的消息存到哪里了呢?如何查看處理的結(jié)果呢?我自己搭了一個(gè)Storm集群,跑了一下Storm_starter的wordCount例子,但是在StormUI上沒(méi)有找到結(jié)果,希望能得到您的幫助。

    # re: Storm源碼淺析之topology的提交  回復(fù)  更多評(píng)論   

    2011-12-15 12:02 by dennis
    @fiw
    處理完的消息怎么存儲(chǔ)是你自己負(fù)責(zé)的事情,storm不幫你處理的。
    wordcount的例子應(yīng)該就是放在內(nèi)存里了,掛了就沒(méi)了。
    storm ui只是統(tǒng)計(jì),并不參與topology的邏輯展現(xiàn)。

    # re: Storm源碼淺析之topology的提交  回復(fù)  更多評(píng)論   

    2011-12-15 23:08 by coderplay
    很像hadoop :)

    # re: Storm源碼淺析之topology的提交  回復(fù)  更多評(píng)論   

    2011-12-16 16:58 by fork
    storm為了保證可靠性處理是否必須要存儲(chǔ)還沒(méi)有完全處理的Turple?這樣發(fā)送Turple的Spout是否會(huì)出現(xiàn)OOM?

    # re: Storm源碼淺析之topology的提交  回復(fù)  更多評(píng)論   

    2011-12-19 14:58 by dennis
    @fork
    不會(huì)的,storm只會(huì)存儲(chǔ)發(fā)送的tuple id,這只是一個(gè)8個(gè)字節(jié)的long類型,想要OOM還是比較困難的。

    # re: Storm源碼淺析之topology的提交  回復(fù)  更多評(píng)論   

    2012-01-06 11:34 by xiaofeng_metis
    期待Storm源碼淺析的其它內(nèi)容

    # re: Storm源碼淺析之topology的提交[未登錄](méi)  回復(fù)  更多評(píng)論   

    2012-02-20 01:03 by 胡楊
    前輩,您好!看到您讀過(guò)那么多的源代碼,真的是發(fā)自內(nèi)心的崇拜,你的精力好旺盛,你對(duì)技術(shù)真的好執(zhí)著!

    現(xiàn)在這幾天準(zhǔn)備開(kāi)始讀讀spring的源碼。但是把源代碼導(dǎo)入Eclipse并運(yùn)行起來(lái)以后,在spring啟動(dòng)的入口打了個(gè)斷點(diǎn),可是總是進(jìn)入不了這個(gè)斷點(diǎn)。在網(wǎng)上查了很多的資料,有的說(shuō)是要編譯一下源代碼,我試過(guò)了,也不行,這個(gè)問(wèn)題困擾了好幾天。不知道您剛開(kāi)始的時(shí)候是怎么閱讀的? 用的什么工具?

    # re: Storm源碼淺析之topology的提交[未登錄](méi)  回復(fù)  更多評(píng)論   

    2012-04-11 14:47 by dhc
    Storm中既有Java,又有Clojure

    # re: Storm源碼淺析之topology的提交[未登錄](méi)  回復(fù)  更多評(píng)論   

    2012-04-11 14:50 by dhc
    你好,看了這篇文章很多以前沒(méi)有明白的點(diǎn)豁然開(kāi)朗。但是這篇文章只是分析了storm client、nimbus上的流程,能夠介紹下supervisor啟動(dòng)后的流程。謝謝!
    主站蜘蛛池模板: 亚洲一区视频在线播放| 亚洲老妈激情一区二区三区| 成人免费黄色网址| 美国免费高清一级毛片| 久久精品7亚洲午夜a| 成人毛片免费在线观看| 国产成人免费网站| 丁香花在线视频观看免费| 最近更新免费中文字幕大全| 亚洲精品美女久久7777777| 激情五月亚洲色图| 亚洲一二成人精品区| 亚洲精品视频在线看| 亚洲欧洲中文日韩av乱码| 国产成人麻豆亚洲综合无码精品| 操美女视频免费网站| 手机在线免费视频| 永久免费观看的毛片的网站| 可以免费观看一级毛片黄a| 国产免费毛不卡片| 免费观看毛片视频| 日韩亚洲精品福利| 天天摸夜夜摸成人免费视频| 国产一精品一aⅴ一免费| 免费看黄视频网站| 国内自产少妇自拍区免费| 夜色阁亚洲一区二区三区| 亚洲中文字幕无码一区| 亚洲综合无码一区二区| 亚洲人精品亚洲人成在线| 麻豆亚洲AV成人无码久久精品| 亚洲最大无码中文字幕| 黄色片网站在线免费观看| 中文字幕高清免费不卡视频 | 日韩免费a级在线观看| 黄在线观看www免费看| 精品国产免费一区二区| 久久亚洲国产成人影院网站| 亚洲情a成黄在线观看| 五月天网站亚洲小说| 亚洲乱码在线卡一卡二卡新区|