kafka的設(shè)計(jì)初衷是希望作為一個(gè)統(tǒng)一的信息收集平臺(tái),能夠?qū)崟r(shí)的收集反饋信息,并需要能夠支撐較大的數(shù)據(jù)量,且具備良好的容錯(cuò)能力.
1.Persistence
kafka使用文件存儲(chǔ)消息,這就直接決定kafka在性能上嚴(yán)重依賴(lài)文件系統(tǒng)的本身特性.且無(wú)論任何OS下,對(duì)文件系統(tǒng)本身的優(yōu)化幾乎沒(méi)有可能.文件緩存/直接內(nèi)存映射等是常用的手段.因?yàn)閗afka是對(duì)日志文件進(jìn)行append操作,因此磁盤(pán)檢索的開(kāi)支是較小的;同時(shí)為了減少磁盤(pán)寫(xiě)入的次數(shù),broker會(huì)將消息暫時(shí)buffer起來(lái),當(dāng)消息的個(gè)數(shù)(或尺寸)達(dá)到一定閥值時(shí),再flush到磁盤(pán),這樣減少了磁盤(pán)IO調(diào)用的次數(shù).
2.Efficiency
需要考慮的影響性能點(diǎn)很多,除磁盤(pán)IO之外,我們還需要考慮網(wǎng)絡(luò)IO,這直接關(guān)系到kafka的吞吐量問(wèn)題.kafka并沒(méi)有提供太多高超的技巧;對(duì)于producer端,可以將消息buffer起來(lái),當(dāng)消息的條數(shù)達(dá)到一定閥值時(shí),批量發(fā)送給broker;對(duì)于consumer端也是一樣,批量fetch多條消息.不過(guò)消息量的大小可以通過(guò)配置文件來(lái)指定.對(duì)于kafka broker端,似乎有個(gè)sendfile系統(tǒng)調(diào)用可以潛在的提升網(wǎng)絡(luò)IO的性能:將文件的數(shù)據(jù)映射到系統(tǒng)內(nèi)存中,socket直接讀取相應(yīng)的內(nèi)存區(qū)域即可,而無(wú)需進(jìn)程再次copy和交換.
其實(shí)對(duì)于producer/consumer/broker三者而言,CPU的開(kāi)支應(yīng)該都不大,因此啟用消息壓縮機(jī)制是一個(gè)良好的策略;壓縮需要消耗少量的CPU資源,不過(guò)對(duì)于kafka而言,網(wǎng)絡(luò)IO更應(yīng)該需要考慮.可以將任何在網(wǎng)絡(luò)上傳輸?shù)南⒍冀?jīng)過(guò)壓縮.kafka支持gzip/snappy等多種壓縮方式.
3. Producer
Load balancing: producer將會(huì)和Topic下所有partition leader保持socket連接;消息由producer直接通過(guò)socket發(fā)送到broker,中間不會(huì)經(jīng)過(guò)任何"路由層".事實(shí)上,消息被路由到哪個(gè)partition上,有producer客戶(hù)端決定.比如可以采用"random""key-hash""輪詢(xún)"等,如果一個(gè)topic中有多個(gè)partitions,那么在producer端實(shí)現(xiàn)"消息均衡分發(fā)"是必要的.
其中partition leader的位置(host:port)注冊(cè)在zookeeper中,producer作為zookeeper client,已經(jīng)注冊(cè)了watch用來(lái)監(jiān)聽(tīng)partition leader的變更事件.
Asynchronous send: 將多條消息暫且在客戶(hù)端buffer起來(lái),并將他們批量發(fā)送到broker;小數(shù)據(jù)IO太多,會(huì)拖慢整體的網(wǎng)絡(luò)延遲,批量延遲發(fā)送事實(shí)上提升了網(wǎng)絡(luò)效率;不過(guò)這也有一定的隱患,比如當(dāng)producer失效時(shí),那些尚未發(fā)送的消息將會(huì)丟失.
4.Consumer
consumer端向broker發(fā)送"fetch"請(qǐng)求,并告知其獲取消息的offset;此后consumer將會(huì)獲得一定條數(shù)的消息;consumer端也可以重置offset來(lái)重新消費(fèi)消息.
在JMS實(shí)現(xiàn)中,Topic模型基于push方式,即broker將消息推送給consumer端.不過(guò)在kafka中,采用了pull方式,即consumer在和broker建立連接之后,主動(dòng)去pull(或者說(shuō)fetch)消息;這中模式有些優(yōu)點(diǎn),首先consumer端可以根據(jù)自己的消費(fèi)能力適時(shí)的去fetch消息并處理,且可以控制消息消費(fèi)的進(jìn)度(offset);此外,消費(fèi)者可以良好的控制消息消費(fèi)的數(shù)量,batch fetch.
其他JMS實(shí)現(xiàn),消息消費(fèi)的位置是有prodiver保留,以便避免重復(fù)發(fā)送消息或者將沒(méi)有消費(fèi)成功的消息重發(fā)等,同時(shí)還要控制消息的狀態(tài).這就要求JMS broker需要太多額外的工作.在kafka中,partition中的消息只有一個(gè)consumer在消費(fèi),且不存在消息狀態(tài)的控制,也沒(méi)有復(fù)雜的消息確認(rèn)機(jī)制,可見(jiàn)kafka broker端是相當(dāng)輕量級(jí)的.當(dāng)消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并間歇性的向zookeeper注冊(cè)offset.由此可見(jiàn),consumer客戶(hù)端也很輕量級(jí).
5.Message Delivery Semantics
對(duì)于JMS實(shí)現(xiàn),消息傳輸擔(dān)保非常直接:有且只有一次(exactly once).在kafka中稍有不同:
1) at most once: 最多一次,這個(gè)和JMS中"非持久化"消息類(lèi)似.發(fā)送一次,無(wú)論成敗,將不會(huì)重發(fā).
2) at least once: 消息至少發(fā)送一次,如果消息未能接受成功,可能會(huì)重發(fā),直到接收成功.
3) exactly once: 消息只會(huì)發(fā)送一次.
at most once: 消費(fèi)者fetch消息,然后保存offset,然后處理消息;當(dāng)client保存offset之后,但是在消息處理過(guò)程中出現(xiàn)了異常,導(dǎo)致部分消息未能繼續(xù)處理.那么此后"未處理"的消息將不能被fetch到,這就是"at most once".
at least once: 消費(fèi)者fetch消息,然后處理消息,然后保存offset.如果消息處理成功之后,但是在保存offset階段zookeeper異常導(dǎo)致保存操作未能執(zhí)行成功,這就導(dǎo)致接下來(lái)再次fetch時(shí)可能獲得上次已經(jīng)處理過(guò)的消息,這就是"at least once".
exactly once: kafka中并沒(méi)有嚴(yán)格的去實(shí)現(xiàn)(基于2階段提交,事務(wù)),我們認(rèn)為這種策略在kafka中是沒(méi)有必要的.
通常情況下"at-least-once"是我們搜選.(相比at most once而言,重復(fù)接收數(shù)據(jù)總比丟失數(shù)據(jù)要好).
6. Replication
kafka將每個(gè)partition數(shù)據(jù)復(fù)制到多個(gè)server上,任何一個(gè)partition有一個(gè)leader和多個(gè)follower(可以沒(méi)有);備份的個(gè)數(shù)可以通過(guò)broker配置文件來(lái)設(shè)定.leader處理所有的read-write請(qǐng)求,follower需要和leader保持同步.Follower和consumer一樣,消費(fèi)消息并保存在本地日志中;leader負(fù)責(zé)跟蹤所有的follower狀態(tài),如果follower"落后"太多或者失效,leader將會(huì)把它從replicas同步列表中刪除.當(dāng)所有的follower都將一條消息保存成功,此消息才被認(rèn)為是"committed",那么此時(shí)consumer才能消費(fèi)它.即使只有一個(gè)replicas實(shí)例存活,仍然可以保證消息的正常發(fā)送和接收,只要zookeeper集群存活即可.(不同于其他分布式存儲(chǔ),比如hbase需要"多數(shù)派"存活才行)
當(dāng)leader失效時(shí),需在followers中選取出新的leader,可能此時(shí)follower落后于leader,因此需要選擇一個(gè)"up-to-date"的follower.選擇follower時(shí)需要兼顧一個(gè)問(wèn)題,就是新leader server上所已經(jīng)承載的partition leader的個(gè)數(shù),如果一個(gè)server上有過(guò)多的partition leader,意味著此server將承受著更多的IO壓力.在選舉新leader,需要考慮到"負(fù)載均衡".
7.Log
如果一個(gè)topic的名稱(chēng)為"my_topic",它有2個(gè)partitions,那么日志將會(huì)保存在my_topic_0和my_topic_1兩個(gè)目錄中;日志文件中保存了一序列"log entries"(日志條目),每個(gè)log entry格式為"4個(gè)字節(jié)的數(shù)字N表示消息的長(zhǎng)度" + "N個(gè)字節(jié)的消息內(nèi)容";每個(gè)日志都有一個(gè)offset來(lái)唯一的標(biāo)記一條消息,offset的值為8個(gè)字節(jié)的數(shù)字,表示此消息在此partition中所處的起始位置..每個(gè)partition在物理存儲(chǔ)層面,有多個(gè)log file組成(稱(chēng)為segment).segment file的命名為"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.

(摘自官網(wǎng))
其中每個(gè)partiton中所持有的segments列表信息會(huì)存儲(chǔ)在zookeeper中.
當(dāng)segment文件尺寸達(dá)到一定閥值時(shí)(可以通過(guò)配置文件設(shè)定,默認(rèn)1G),將會(huì)創(chuàng)建一個(gè)新的文件;當(dāng)buffer中消息的條數(shù)達(dá)到閥值時(shí)將會(huì)觸發(fā)日志信息flush到日志文件中,同時(shí)如果"距離最近一次flush的時(shí)間差"達(dá)到閥值時(shí),也會(huì)觸發(fā)flush到日志文件.如果broker失效,極有可能會(huì)丟失那些尚未flush到文件的消息.因?yàn)閟erver意外實(shí)現(xiàn),仍然會(huì)導(dǎo)致log文件格式的破壞(文件尾部),那么就要求當(dāng)server啟東是需要檢測(cè)最后一個(gè)segment的文件結(jié)構(gòu)是否合法并進(jìn)行必要的修復(fù).
獲取消息時(shí),需要指定offset和最大chunk尺寸,offset用來(lái)表示消息的起始位置,chunk size用來(lái)表示最大獲取消息的總長(zhǎng)度(間接的表示消息的條數(shù)).根據(jù)offset,可以找到此消息所在segment文件,然后根據(jù)segment的最小offset取差值,得到它在file中的相對(duì)位置,直接讀取輸出即可.
日志文件的刪除策略非常簡(jiǎn)單:啟動(dòng)一個(gè)后臺(tái)線程定期掃描log file列表,把保存時(shí)間超過(guò)閥值的文件直接刪除(根據(jù)文件的創(chuàng)建時(shí)間).為了避免刪除文件時(shí)仍然有read操作(consumer消費(fèi)),采取copy-on-write方式.
8.Distribution
kafka使用zookeeper來(lái)存儲(chǔ)一些meta信息,并使用了zookeeper watch機(jī)制來(lái)發(fā)現(xiàn)meta信息的變更并作出相應(yīng)的動(dòng)作(比如consumer失效,觸發(fā)負(fù)載均衡等)
1) Broker node registry: 當(dāng)一個(gè)kafka broker啟動(dòng)后,首先會(huì)向zookeeper注冊(cè)自己的節(jié)點(diǎn)信息(臨時(shí)znode),同時(shí)當(dāng)broker和zookeeper斷開(kāi)連接時(shí),此znode也會(huì)被刪除.
格式: /broker/ids/[0...N] -->host:port;其中[0..N]表示broker id,每個(gè)broker的配置文件中都需要指定一個(gè)數(shù)字類(lèi)型的id(全局不可重復(fù)),znode的值為此broker的host:port信息.
2) Broker Topic Registry: 當(dāng)一個(gè)broker啟動(dòng)時(shí),會(huì)向zookeeper注冊(cè)自己持有的topic和partitions信息,仍然是一個(gè)臨時(shí)znode.
格式: /broker/topics/[topic]/[0...N] 其中[0..N]表示partition索引號(hào).
3) Consumer and Consumer group: 每個(gè)consumer客戶(hù)端被創(chuàng)建時(shí),會(huì)向zookeeper注冊(cè)自己的信息;此作用主要是為了"負(fù)載均衡".
一個(gè)group中的多個(gè)consumer可以交錯(cuò)的消費(fèi)一個(gè)topic的所有partitions;簡(jiǎn)而言之,保證此topic的所有partitions都能被此group所消費(fèi),且消費(fèi)時(shí)為了性能考慮,讓partition相對(duì)均衡的分散到每個(gè)consumer上.
4) Consumer id Registry: 每個(gè)consumer都有一個(gè)唯一的ID(host:uuid,可以通過(guò)配置文件指定,也可以由系統(tǒng)生成),此id用來(lái)標(biāo)記消費(fèi)者信息.
格式: /consumers/[group_id]/ids/[consumer_id]
仍然是一個(gè)臨時(shí)的znode,此節(jié)點(diǎn)的值為{"topic_name":#streams...},即表示此consumer目前所消費(fèi)的topic + partitions列表.
5) Consumer offset Tracking: 用來(lái)跟蹤每個(gè)consumer目前所消費(fèi)的partition中最大的offset.
格式: /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] -->offset_value
此znode為持久節(jié)點(diǎn),可以看出offset跟group_id有關(guān),以表明當(dāng)group中一個(gè)消費(fèi)者失效,其他consumer可以繼續(xù)消費(fèi).
6) Partition Owner registry: 用來(lái)標(biāo)記partition被哪個(gè)consumer消費(fèi).臨時(shí)znode
格式: /consumers/[group_id]/owners/[topic]/[broker_id-partition_id] -->consumer_node_id
當(dāng)consumer啟動(dòng)時(shí),所觸發(fā)的操作:
A) 首先進(jìn)行"Consumer id Registry";
B) 然后在"Consumer id Registry"節(jié)點(diǎn)下注冊(cè)一個(gè)watch用來(lái)監(jiān)聽(tīng)當(dāng)前group中其他consumer的"leave"和"join";只要此znode path下節(jié)點(diǎn)列表變更,都會(huì)觸發(fā)此group下consumer的負(fù)載均衡.(比如一個(gè)consumer失效,那么其他consumer接管partitions).
C) 在"Broker id registry"節(jié)點(diǎn)下,注冊(cè)一個(gè)watch用來(lái)監(jiān)聽(tīng)broker的存活情況;如果broker列表變更,將會(huì)觸發(fā)所有的groups下的consumer重新balance.

1) Producer端使用zookeeper用來(lái)"發(fā)現(xiàn)"broker列表,以及和Topic下每個(gè)partition leader建立socket連接并發(fā)送消息.
2) Broker端使用zookeeper用來(lái)注冊(cè)broker信息,已經(jīng)監(jiān)測(cè)partition leader存活性.
3) Consumer端使用zookeeper用來(lái)注冊(cè)consumer信息,其中包括consumer消費(fèi)的partition列表等,同時(shí)也用來(lái)發(fā)現(xiàn)broker列表,并和partition leader建立socket連接,并獲取消息.