<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    歡迎光臨郝學武的blog。

    kafka設計理論備忘

    Posted on 2014-01-08 12:01 陜西BOY 閱讀(302) 評論(0)  編輯  收藏 所屬分類: kafka

    kafka的設計初衷是希望作為一個統一的信息收集平臺,能夠實時的收集反饋信息,并需要能夠支撐較大的數據量,且具備良好的容錯能力.

        1.Persistence

        kafka使用文件存儲消息,這就直接決定kafka在性能上嚴重依賴文件系統的本身特性.且無論任何OS下,對文件系統本身的優化幾乎沒有可能.文件緩存/直接內存映射等是常用的手段.因為kafka是對日志文件進行append操作,因此磁盤檢索的開支是較小的;同時為了減少磁盤寫入的次數,broker會將消息暫時buffer起來,當消息的個數(或尺寸)達到一定閥值時,再flush到磁盤,這樣減少了磁盤IO調用的次數.

        2.Efficiency

        需要考慮的影響性能點很多,除磁盤IO之外,我們還需要考慮網絡IO,這直接關系到kafka的吞吐量問題.kafka并沒有提供太多高超的技巧;對于producer端,可以將消息buffer起來,當消息的條數達到一定閥值時,批量發送給broker;對于consumer端也是一樣,批量fetch多條消息.不過消息量的大小可以通過配置文件來指定.對于kafka broker端,似乎有個sendfile系統調用可以潛在的提升網絡IO的性能:將文件的數據映射到系統內存中,socket直接讀取相應的內存區域即可,而無需進程再次copy和交換.

     

        其實對于producer/consumer/broker三者而言,CPU的開支應該都不大,因此啟用消息壓縮機制是一個良好的策略;壓縮需要消耗少量的CPU資源,不過對于kafka而言,網絡IO更應該需要考慮.可以將任何在網絡上傳輸的消息都經過壓縮.kafka支持gzip/snappy等多種壓縮方式.

        3. Producer

        Load balancing: producer將會和Topic下所有partition leader保持socket連接;消息由producer直接通過socket發送到broker,中間不會經過任何"路由層".事實上,消息被路由到哪個partition上,有producer客戶端決定.比如可以采用"random""key-hash""輪詢"等,如果一個topic中有多個partitions,那么在producer端實現"消息均衡分發"是必要的.

        其中partition leader的位置(host:port)注冊在zookeeper中,producer作為zookeeper client,已經注冊了watch用來監聽partition leader的變更事件.

        Asynchronous send: 將多條消息暫且在客戶端buffer起來,并將他們批量發送到broker;小數據IO太多,會拖慢整體的網絡延遲,批量延遲發送事實上提升了網絡效率;不過這也有一定的隱患,比如當producer失效時,那些尚未發送的消息將會丟失.

        4.Consumer

        consumer端向broker發送"fetch"請求,并告知其獲取消息的offset;此后consumer將會獲得一定條數的消息;consumer端也可以重置offset來重新消費消息.

        在JMS實現中,Topic模型基于push方式,即broker將消息推送給consumer端.不過在kafka中,采用了pull方式,即consumer在和broker建立連接之后,主動去pull(或者說fetch)消息;這中模式有些優點,首先consumer端可以根據自己的消費能力適時的去fetch消息并處理,且可以控制消息消費的進度(offset);此外,消費者可以良好的控制消息消費的數量,batch fetch.

        其他JMS實現,消息消費的位置是有prodiver保留,以便避免重復發送消息或者將沒有消費成功的消息重發等,同時還要控制消息的狀態.這就要求JMS broker需要太多額外的工作.在kafka中,partition中的消息只有一個consumer在消費,且不存在消息狀態的控制,也沒有復雜的消息確認機制,可見kafka broker端是相當輕量級的.當消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并間歇性的向zookeeper注冊offset.由此可見,consumer客戶端也很輕量級.

        5.Message Delivery Semantics

        對于JMS實現,消息傳輸擔保非常直接:有且只有一次(exactly once).在kafka中稍有不同:

        1) at most once: 最多一次,這個和JMS中"非持久化"消息類似.發送一次,無論成敗,將不會重發.

        2) at least once: 消息至少發送一次,如果消息未能接受成功,可能會重發,直到接收成功.

        3) exactly once: 消息只會發送一次.

        at most once: 消費者fetch消息,然后保存offset,然后處理消息;當client保存offset之后,但是在消息處理過程中出現了異常,導致部分消息未能繼續處理.那么此后"未處理"的消息將不能被fetch到,這就是"at most once".

        at least once: 消費者fetch消息,然后處理消息,然后保存offset.如果消息處理成功之后,但是在保存offset階段zookeeper異常導致保存操作未能執行成功,這就導致接下來再次fetch時可能獲得上次已經處理過的消息,這就是"at least once".

        exactly once: kafka中并沒有嚴格的去實現(基于2階段提交,事務),我們認為這種策略在kafka中是沒有必要的.

        通常情況下"at-least-once"是我們搜選.(相比at most once而言,重復接收數據總比丟失數據要好).

        6. Replication

        kafka將每個partition數據復制到多個server上,任何一個partition有一個leader和多個follower(可以沒有);備份的個數可以通過broker配置文件來設定.leader處理所有的read-write請求,follower需要和leader保持同步.Follower和consumer一樣,消費消息并保存在本地日志中;leader負責跟蹤所有的follower狀態,如果follower"落后"太多或者失效,leader將會把它從replicas同步列表中刪除.當所有的follower都將一條消息保存成功,此消息才被認為是"committed",那么此時consumer才能消費它.即使只有一個replicas實例存活,仍然可以保證消息的正常發送和接收,只要zookeeper集群存活即可.(不同于其他分布式存儲,比如hbase需要"多數派"存活才行)

        當leader失效時,需在followers中選取出新的leader,可能此時follower落后于leader,因此需要選擇一個"up-to-date"的follower.選擇follower時需要兼顧一個問題,就是新leader server上所已經承載的partition leader的個數,如果一個server上有過多的partition leader,意味著此server將承受著更多的IO壓力.在選舉新leader,需要考慮到"負載均衡".

        7.Log

        如果一個topic的名稱為"my_topic",它有2個partitions,那么日志將會保存在my_topic_0和my_topic_1兩個目錄中;日志文件中保存了一序列"log entries"(日志條目),每個log entry格式為"4個字節的數字N表示消息的長度" + "N個字節的消息內容";每個日志都有一個offset來唯一的標記一條消息,offset的值為8個字節的數字,表示此消息在此partition中所處的起始位置..每個partition在物理存儲層面,有多個log file組成(稱為segment).segment file的命名為"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.



    (摘自官網) 

        其中每個partiton中所持有的segments列表信息會存儲在zookeeper中.

        當segment文件尺寸達到一定閥值時(可以通過配置文件設定,默認1G),將會創建一個新的文件;當buffer中消息的條數達到閥值時將會觸發日志信息flush到日志文件中,同時如果"距離最近一次flush的時間差"達到閥值時,也會觸發flush到日志文件.如果broker失效,極有可能會丟失那些尚未flush到文件的消息.因為server意外實現,仍然會導致log文件格式的破壞(文件尾部),那么就要求當server啟東是需要檢測最后一個segment的文件結構是否合法并進行必要的修復.

        獲取消息時,需要指定offset和最大chunk尺寸,offset用來表示消息的起始位置,chunk size用來表示最大獲取消息的總長度(間接的表示消息的條數).根據offset,可以找到此消息所在segment文件,然后根據segment的最小offset取差值,得到它在file中的相對位置,直接讀取輸出即可.

        日志文件的刪除策略非常簡單:啟動一個后臺線程定期掃描log file列表,把保存時間超過閥值的文件直接刪除(根據文件的創建時間).為了避免刪除文件時仍然有read操作(consumer消費),采取copy-on-write方式.

        8.Distribution

        kafka使用zookeeper來存儲一些meta信息,并使用了zookeeper watch機制來發現meta信息的變更并作出相應的動作(比如consumer失效,觸發負載均衡等)

        1) Broker node registry: 當一個kafka broker啟動后,首先會向zookeeper注冊自己的節點信息(臨時znode),同時當broker和zookeeper斷開連接時,此znode也會被刪除.

        格式: /broker/ids/[0...N]   -->host:port;其中[0..N]表示broker id,每個broker的配置文件中都需要指定一個數字類型的id(全局不可重復),znode的值為此broker的host:port信息.

        2) Broker Topic Registry: 當一個broker啟動時,會向zookeeper注冊自己持有的topic和partitions信息,仍然是一個臨時znode.

        格式: /broker/topics/[topic]/[0...N]  其中[0..N]表示partition索引號.

        3) Consumer and Consumer group: 每個consumer客戶端被創建時,會向zookeeper注冊自己的信息;此作用主要是為了"負載均衡".

        一個group中的多個consumer可以交錯的消費一個topic的所有partitions;簡而言之,保證此topic的所有partitions都能被此group所消費,且消費時為了性能考慮,讓partition相對均衡的分散到每個consumer上.

        4) Consumer id Registry: 每個consumer都有一個唯一的ID(host:uuid,可以通過配置文件指定,也可以由系統生成),此id用來標記消費者信息.

        格式: /consumers/[group_id]/ids/[consumer_id]

        仍然是一個臨時的znode,此節點的值為{"topic_name":#streams...},即表示此consumer目前所消費的topic + partitions列表.

        5) Consumer offset Tracking: 用來跟蹤每個consumer目前所消費的partition中最大的offset.

        格式: /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]   -->offset_value

        此znode為持久節點,可以看出offset跟group_id有關,以表明當group中一個消費者失效,其他consumer可以繼續消費.

        6) Partition Owner registry: 用來標記partition被哪個consumer消費.臨時znode

        格式: /consumers/[group_id]/owners/[topic]/[broker_id-partition_id]   -->consumer_node_id

        

        當consumer啟動時,所觸發的操作:

        A) 首先進行"Consumer id Registry";

        B) 然后在"Consumer id Registry"節點下注冊一個watch用來監聽當前group中其他consumer的"leave"和"join";只要此znode path下節點列表變更,都會觸發此group下consumer的負載均衡.(比如一個consumer失效,那么其他consumer接管partitions).

        C) 在"Broker id registry"節點下,注冊一個watch用來監聽broker的存活情況;如果broker列表變更,將會觸發所有的groups下的consumer重新balance.



     

        1) Producer端使用zookeeper用來"發現"broker列表,以及和Topic下每個partition leader建立socket連接并發送消息.

        2) Broker端使用zookeeper用來注冊broker信息,已經監測partition leader存活性.

        3) Consumer端使用zookeeper用來注冊consumer信息,其中包括consumer消費的partition列表等,同時也用來發現broker列表,并和partition leader建立socket連接,并獲取消息.


    只有注冊用戶登錄后才能發表評論。


    網站導航:
     

    posts - 17, comments - 65, trackbacks - 0, articles - 28

    Copyright © 陜西BOY

    主站蜘蛛池模板: 国产亚洲精品自在线观看| 波多野结衣中文一区二区免费| 精品亚洲综合久久中文字幕| 二级毛片免费观看全程| 亚洲福利视频一区二区| 免费大片黄在线观看| 亚洲最大av无码网址| 日本三级在线观看免费| 亚洲国产精品久久久天堂 | 人人爽人人爽人人片av免费| 亚洲人成电影网站国产精品| 巨胸喷奶水视频www免费视频| 亚洲日韩欧洲无码av夜夜摸| 久久精品免费观看国产| 亚洲国产成人精品青青草原| 成人无遮挡裸免费视频在线观看| 亚洲精品人成网在线播放影院| 午夜电影免费观看| 免费夜色污私人影院网站| 国产亚洲av片在线观看播放| 免费无码中文字幕A级毛片| 亚洲人成黄网在线观看| 国产高清免费在线| 国产做国产爱免费视频| 亚洲码一区二区三区| 精品少妇人妻AV免费久久洗澡| 一个人看的免费高清视频日本| 亚洲AV无码久久精品狠狠爱浪潮 | 好男人www免费高清视频在线| 亚洲av日韩综合一区久热| 国产亚洲精久久久久久无码77777| 99精品在线免费观看| 亚洲综合无码一区二区痴汉| 精品亚洲一区二区三区在线观看| 日本在线免费观看| 亚洲精品伦理熟女国产一区二区 | 亚洲国产精品婷婷久久| 免费观看a级毛片| 视频免费在线观看| 亚洲日韩一区二区一无码| 在线观看亚洲av每日更新|