<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    莊周夢蝶

    生活、程序、未來
       :: 首頁 ::  ::  :: 聚合  :: 管理

    Storm源碼淺析之topology的提交

    Posted on 2011-12-01 21:48 dennis 閱讀(15217) 評論(10)  編輯  收藏 所屬分類: java源碼解讀Hadoop與分布式Clojure
        原文:http://www.tkk7.com/killme2008/archive/2011/11/17/364112.html
        作者:dennis (killme2008@gmail.com)
        轉載請注明出處。

        最近一直在讀twitter開源的這個分布式流計算框架——storm的源碼,還是有必要記錄下一些比較有意思的地方。我按照storm的主要概念進行組織,并且只分析我關注的東西,因此稱之為淺析。       

    一、介紹
        Storm的開發語言主要是Java和Clojure,其中Java定義骨架,而Clojure編寫核心邏輯。源碼統計結果:
         180 text files.
         
    177 unique files.                                          
           
    7 files ignored.

    http:
    //cloc.sourceforge.net v 1.55  T=1.0 s (171.0 files/s, 46869.0 lines/s)
    -------------------------------------------------------------------------------
    Language                     files          blank        comment           code
    -------------------------------------------------------------------------------
    Java                           
    125           5010           2414          25661
    Lisp                            
    33            732            283           4871
    Python                           
    7            742            433           4675
    CSS                              
    1             12             45           1837
    ruby                             
    2             22              0            104
    Bourne Shell                     
    1              0              0              6
    Javascript                       
    2              1             15              6
    -------------------------------------------------------------------------------
    SUM:                           
    171           6519           3190          37160
    -------------------------------------------------------------------------------

        Java代碼25000多行,而Clojure(Lisp)只有4871行,說語言不重要再次證明是扯淡。
            
    二、Topology和Nimbus       
        Topology是storm的核心理念,將spout和bolt組織成一個topology,運行在storm集群里,完成實時分析和計算的任務。這里我主要想介紹下topology部署到storm集群的大概過程。提交一個topology任務到Storm集群是通過StormSubmitter.submitTopology方法提交:
    StormSubmitter.submitTopology(name, conf, builder.createTopology());
        我們將topology打成jar包后,利用bin/storm這個python腳本,執行如下命令:
    bin/storm jar xxxx.jar com.taobao.MyTopology args
        將jar包提交給storm集群。storm腳本會啟動JVM執行Topology的main方法,執行submitTopology的過程。而submitTopology會將jar文件上傳到nimbus,上傳是通過socket傳輸。在storm這個python腳本的jar方法里可以看到:
    def jar(jarfile, klass, *args):                                                                                                                               
       exec_storm_class(                                                                                                                                          
            klass,                                                                                                                                                
            jvmtype
    ="-client",                                                                                                                                    
            extrajars
    =[jarfile, CONF_DIR, STORM_DIR + "/bin"],                                                                                                    
            args
    =args,                                                                                                                                            
            prefix
    ="export STORM_JAR=" + jarfile + ";")
         將jar文件的地址設置為環境變量STORM_JAR,這個環境變量在執行submitTopology的時候用到:
    //StormSubmitter.java 
    private static void submitJar(Map conf) {
            
    if(submittedJar==null) {
                LOG.info(
    "Jar not uploaded to master yet. Submitting jar");
                String localJar 
    = System.getenv("STORM_JAR");
                submittedJar 
    = submitJar(conf, localJar);
            } 
    else {
                LOG.info(
    "Jar already uploaded to master. Not submitting jar.");
            }
        }
        通過環境變量找到jar包的地址,然后上傳。利用環境變量傳參是個小技巧。

        其次,nimbus在接收到jar文件后,存放到數據目錄的inbox目錄,nimbus數據目錄的結構
    -nimbus
         
    -inbox
             
    -stormjar-57f1d694-2865-4b3b-8a7c-99104fc0aea3.jar
             
    -stormjar-76b4e316-b430-4215-9e26-4f33ba4ee520.jar

         
    -stormdist
            
    -storm-id
               
    -stormjar.jar
               
    -stormconf.ser
               
    -stormcode.ser
         其中inbox用于存放提交的jar文件,每個jar文件都重命名為stormjar加上一個32位的UUID。而stormdist存放的是啟動topology后生成的文件,每個topology都分配一個唯一的id,ID的規則是“name-計數-時間戳”。啟動后的topology的jar文件名命名為storm.jar ,而它的配置經過java序列化后存放在stormconf.ser文件,而stormcode.ser是將topology本身序列化后存放的文件。這些文件在部署的時候,supervisor會從這個目錄下載這些文件,然后在supervisor本地執行這些代碼。
        進入重點,topology任務的分配過程(zookeeper路徑說明忽略root):
    1.在zookeeper上創建/taskheartbeats/{storm id} 路徑,用于任務的心跳檢測。storm對zookeeper的一個重要應用就是利用zk的臨時節點做存活檢測。task將定時刷新節點的時間戳,然后nimbus會檢測這個時間戳是否超過timeout設置。
    2.從topology中獲取bolts,spouts設置的并行數目以及全局配置的最大并行數,然后產生task id列表,如[1 2 3 4]
    3.在zookeeper上創建/tasks/{strom id}/{task id}路徑,并存儲task信息
    4.開始分配任務(內部稱為assignment), 具體步驟:
     (1)從zk上獲得已有的assignment(新的toplogy當然沒有了)
     (2)查找所有可用的slot,所謂slot就是可用的worker,在所有supervisor上配置的多個worker的端口。
     (3)將任務均勻地分配給可用的worker,這里有兩種情況:
     (a)task數目比worker多,例如task是[1 2 3 4],可用的slot只有[host1:port1 host2:port1],那么最終是這樣分配
    {1: [host1:port1] 2 : [host2:port1]
             
    3 : [host1:port1] 4 : [host2:port1]}
    ,可以看到任務平均地分配在兩個worker上。
    (b)如果task數目比worker少,例如task是[1 2],而worker有[host1:port1 host1:port2 host2:port1 host2:port2],那么首先會將woker排序,將不同host間隔排列,保證task不會全部分配到同一個worker上,也就是將worker排列成
    [host1:port1 host2:port1 host1:port2 host2:port2]
    ,然后分配任務為
    {1: host1:port1 , 2 : host2:port2}

    (4)記錄啟動時間
    (5)判斷現有的assignment是否跟重新分配的assignment相同,如果相同,不需要變更,否則更新assignment到zookeeper的/assignments/{storm id}上。
    5.啟動topology,所謂啟動,只是將zookeeper上/storms/{storm id}對應的數據里的active設置為true。
    6.nimbus會檢查task的心跳,如果發現task心跳超過超時時間,那么會重新跳到第4步做re-assignment。

    評論

    # re: Storm源碼淺析之topology的提交  回復  更多評論   

    2011-12-05 11:11 by
    Java代碼25000多行,而Clojure(Lisp)只有4871行5563。。。不明白是什么意思?是Storm中既有Java,又有Clojure?還是Storm有Java和Clojure兩個版本?

    # re: Storm源碼淺析之topology的提交  回復  更多評論   

    2011-12-14 23:56 by fiw
    非常感謝您的講解,給了我很大的幫助。有一個問題,Storm最后處理完的消息存到哪里了呢?如何查看處理的結果呢?我自己搭了一個Storm集群,跑了一下Storm_starter的wordCount例子,但是在StormUI上沒有找到結果,希望能得到您的幫助。

    # re: Storm源碼淺析之topology的提交  回復  更多評論   

    2011-12-15 12:02 by dennis
    @fiw
    處理完的消息怎么存儲是你自己負責的事情,storm不幫你處理的。
    wordcount的例子應該就是放在內存里了,掛了就沒了。
    storm ui只是統計,并不參與topology的邏輯展現。

    # re: Storm源碼淺析之topology的提交  回復  更多評論   

    2011-12-15 23:08 by coderplay
    很像hadoop :)

    # re: Storm源碼淺析之topology的提交  回復  更多評論   

    2011-12-16 16:58 by fork
    storm為了保證可靠性處理是否必須要存儲還沒有完全處理的Turple?這樣發送Turple的Spout是否會出現OOM?

    # re: Storm源碼淺析之topology的提交  回復  更多評論   

    2011-12-19 14:58 by dennis
    @fork
    不會的,storm只會存儲發送的tuple id,這只是一個8個字節的long類型,想要OOM還是比較困難的。

    # re: Storm源碼淺析之topology的提交  回復  更多評論   

    2012-01-06 11:34 by xiaofeng_metis
    期待Storm源碼淺析的其它內容

    # re: Storm源碼淺析之topology的提交[未登錄]  回復  更多評論   

    2012-02-20 01:03 by 胡楊
    前輩,您好!看到您讀過那么多的源代碼,真的是發自內心的崇拜,你的精力好旺盛,你對技術真的好執著!

    現在這幾天準備開始讀讀spring的源碼。但是把源代碼導入Eclipse并運行起來以后,在spring啟動的入口打了個斷點,可是總是進入不了這個斷點。在網上查了很多的資料,有的說是要編譯一下源代碼,我試過了,也不行,這個問題困擾了好幾天。不知道您剛開始的時候是怎么閱讀的? 用的什么工具?

    # re: Storm源碼淺析之topology的提交[未登錄]  回復  更多評論   

    2012-04-11 14:47 by dhc
    Storm中既有Java,又有Clojure

    # re: Storm源碼淺析之topology的提交[未登錄]  回復  更多評論   

    2012-04-11 14:50 by dhc
    你好,看了這篇文章很多以前沒有明白的點豁然開朗。但是這篇文章只是分析了storm client、nimbus上的流程,能夠介紹下supervisor啟動后的流程。謝謝!
    主站蜘蛛池模板: 国产午夜亚洲不卡| 亚洲精品国产综合久久一线| 久久亚洲精品成人777大小说| 一级做性色a爰片久久毛片免费| 伊人久久亚洲综合影院| 黄网站色视频免费观看45分钟| 在线观看亚洲免费视频| 日日狠狠久久偷偷色综合免费 | 性xxxx黑人与亚洲| 欧美最猛性xxxxx免费| 日本免费网址大全在线观看| 中文字幕 亚洲 有码 在线 | 亚洲免费在线观看| 久久99国产亚洲高清观看首页| 免费的全黄一级录像带| 久久精品a亚洲国产v高清不卡| 亚洲一区二区三区免费视频| 亚洲入口无毒网址你懂的| 国产精品麻豆免费版| 国产高清视频免费在线观看| 久久亚洲国产精品五月天| 999国内精品永久免费视频| 国产亚洲欧美在线观看| 亚洲一级Av无码毛片久久精品| 日本高清高色视频免费| avtt天堂网手机版亚洲| 亚洲国产V高清在线观看| 无码精品国产一区二区三区免费 | 亚洲一区无码精品色| 99re免费视频| 精品久久久久久久久亚洲偷窥女厕| 亚洲女同成人AⅤ人片在线观看| 国产午夜不卡AV免费| 亚洲日韩精品无码专区加勒比| 亚洲综合色成在线播放| 18pao国产成视频永久免费| 国产精品久久久久久亚洲小说| 亚洲av无码国产精品夜色午夜| 女人被弄到高潮的免费视频| 二个人看的www免费视频| 亚洲av产在线精品亚洲第一站|