<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    隨筆-314  評(píng)論-209  文章-0  trackbacks-0
    轉(zhuǎn)自:http://www.cnblogs.com/cyfonly/p/5954614.html

    一、為什么需要消息系統(tǒng)

    復(fù)制代碼
    1.解耦:   允許你獨(dú)立的擴(kuò)展或修改兩邊的處理過(guò)程,只要確保它們遵守同樣的接口約束。 2.冗余:   消息隊(duì)列把數(shù)據(jù)進(jìn)行持久化直到它們已經(jīng)被完全處理,通過(guò)這一方式規(guī)避了數(shù)據(jù)丟失風(fēng)險(xiǎn)。許多消息隊(duì)列所采用的"插入-獲取-刪除"范式中,在把一個(gè)消息從隊(duì)列中刪除之前,需要你的處理系統(tǒng)明確的指出該消息已經(jīng)被處理完畢,從而確保你的數(shù)據(jù)被安全的保存直到你使用完畢。 3.擴(kuò)展性:   因?yàn)橄㈥?duì)列解耦了你的處理過(guò)程,所以增大消息入隊(duì)和處理的頻率是很容易的,只要另外增加處理過(guò)程即可。 4.靈活性 & 峰值處理能力:   在訪問(wèn)量劇增的情況下,應(yīng)用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量并不常見(jiàn)。如果為以能處理這類(lèi)峰值訪問(wèn)為標(biāo)準(zhǔn)來(lái)投入資源隨時(shí)待命無(wú)疑是巨大的浪費(fèi)。使用消息隊(duì)列能夠使關(guān)鍵組件頂住突發(fā)的訪問(wèn)壓力,而不會(huì)因?yàn)橥话l(fā)的超負(fù)荷的請(qǐng)求而完全崩潰。 5.可恢復(fù)性:   系統(tǒng)的一部分組件失效時(shí),不會(huì)影響到整個(gè)系統(tǒng)。消息隊(duì)列降低了進(jìn)程間的耦合度,所以即使一個(gè)處理消息的進(jìn)程掛掉,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。 6.順序保證:   在大多使用場(chǎng)景下,數(shù)據(jù)處理的順序都很重要。大部分消息隊(duì)列本來(lái)就是排序的,并且能保證數(shù)據(jù)會(huì)按照特定的順序來(lái)處理。(Kafka 保證一個(gè) Partition 內(nèi)的消息的有序性) 7.緩沖:   有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過(guò)系統(tǒng)的速度,解決生產(chǎn)消息和消費(fèi)消息的處理速度不一致的情況。 8.異步通信:   很多時(shí)候,用戶(hù)不想也不需要立即處理消息。消息隊(duì)列提供了異步處理機(jī)制,允許用戶(hù)把一個(gè)消息放入隊(duì)列,但并不立即處理它。想向隊(duì)列中放入多少消息就放多少,然后在需要的時(shí)候再去處理它們。
    復(fù)制代碼

     

    二、kafka 架構(gòu)

    2.1 拓?fù)浣Y(jié)構(gòu)

    如下圖:

    圖.1

    2.2 相關(guān)概念

    如圖.1中,kafka 相關(guān)名詞解釋如下:

    復(fù)制代碼
    1.producer:   消息生產(chǎn)者,發(fā)布消息到 kafka 集群的終端或服務(wù)。 2.broker:   kafka 集群中包含的服務(wù)器。 3.topic:   每條發(fā)布到 kafka 集群的消息屬于的類(lèi)別,即 kafka 是面向 topic 的。 4.partition:   partition 是物理上的概念,每個(gè) topic 包含一個(gè)或多個(gè) partition。kafka 分配的單位是 partition。 5.consumer:   從 kafka 集群中消費(fèi)消息的終端或服務(wù)。 6.Consumer group:   high-level consumer API 中,每個(gè) consumer 都屬于一個(gè) consumer group,每條消息只能被 consumer group 中的一個(gè) Consumer 消費(fèi),但可以被多個(gè) consumer group 消費(fèi)。 7.replica:   partition 的副本,保障 partition 的高可用。 8.leader:   replica 中的一個(gè)角色, producer 和 consumer 只跟 leader 交互。 9.follower:   replica 中的一個(gè)角色,從 leader 中復(fù)制數(shù)據(jù)。 10.controller:   kafka 集群中的其中一個(gè)服務(wù)器,用來(lái)進(jìn)行 leader election 以及 各種 failover。 12.zookeeper:   kafka 通過(guò) zookeeper 來(lái)存儲(chǔ)集群的 meta 信息。
    復(fù)制代碼

    2.3 zookeeper 節(jié)點(diǎn)

    kafka 在 zookeeper 中的存儲(chǔ)結(jié)構(gòu)如下圖所示:

     

    圖.2

     

    三、producer 發(fā)布消息

    3.1 寫(xiě)入方式

    producer 采用 push 模式將消息發(fā)布到 broker,每條消息都被 append 到 patition 中,屬于順序?qū)懘疟P(pán)(順序?qū)懘疟P(pán)效率比隨機(jī)寫(xiě)內(nèi)存要高,保障 kafka 吞吐率)。

    3.2 消息路由

    producer 發(fā)送消息到 broker 時(shí),會(huì)根據(jù)分區(qū)算法選擇將其存儲(chǔ)到哪一個(gè) partition。其路由機(jī)制為:

    1. 指定了 patition,則直接使用; 2. 未指定 patition 但指定 key,通過(guò)對(duì) key 的 value 進(jìn)行hash 選出一個(gè) patition 3. patition 和 key 都未指定,使用輪詢(xún)選出一個(gè) patition。

     附上 java 客戶(hù)端分區(qū)源碼,一目了然:

    復(fù)制代碼
    //創(chuàng)建消息實(shí)例 public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {      if (topic == null)           throw new IllegalArgumentException("Topic cannot be null");      if (timestamp != null && timestamp < 0)           throw new IllegalArgumentException("Invalid timestamp " + timestamp);      this.topic = topic;      this.partition = partition;      this.key = key;      this.value = value;      this.timestamp = timestamp; }  //計(jì)算 patition,如果指定了 patition 則直接使用,否則使用 key 計(jì)算 private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {      Integer partition = record.partition();      if (partition != null) {           List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());           int lastPartition = partitions.size() - 1;           if (partition < 0 || partition > lastPartition) {                throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition));           }           return partition;      }      return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }  // 使用 key 選取 patition public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {      List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);      int numPartitions = partitions.size();      if (keyBytes == null) {           int nextValue = counter.getAndIncrement();           List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);           if (availablePartitions.size() > 0) {                int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();                return availablePartitions.get(part).partition();           } else {                return DefaultPartitioner.toPositive(nextValue) % numPartitions;           }      } else {           //對(duì) keyBytes 進(jìn)行 hash 選出一個(gè) patition           return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;      } }
    復(fù)制代碼

    3.3 寫(xiě)入流程

     producer 寫(xiě)入消息序列圖如下所示:

    圖.3

    流程說(shuō)明:

    復(fù)制代碼
    1. producer 先從 zookeeper 的 "/brokers/.../state" 節(jié)點(diǎn)找到該 partition 的 leader 2. producer 將消息發(fā)送給該 leader 3. leader 將消息寫(xiě)入本地 log 4. followers 從 leader pull 消息,寫(xiě)入本地 log 后 leader 發(fā)送 ACK 5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 發(fā)送 ACK
    復(fù)制代碼

    3.4 producer delivery guarantee

     一般情況下存在三種情況:

    1. At most once 消息可能會(huì)丟,但絕不會(huì)重復(fù)傳輸 2. At least one 消息絕不會(huì)丟,但可能會(huì)重復(fù)傳輸 3. Exactly once 每條消息肯定會(huì)被傳輸一次且僅傳輸一次

    當(dāng) producer 向 broker 發(fā)送消息時(shí),一旦這條消息被 commit,由于 replication 的存在,它就不會(huì)丟。但是如果 producer 發(fā)送數(shù)據(jù)給 broker 后,遇到網(wǎng)絡(luò)問(wèn)題而造成通信中斷,那 Producer 就無(wú)法判斷該條消息是否已經(jīng) commit。雖然 Kafka 無(wú)法確定網(wǎng)絡(luò)故障期間發(fā)生了什么,但是 producer 可以生成一種類(lèi)似于主鍵的東西,發(fā)生故障時(shí)冪等性的重試多次,這樣就做到了 Exactly once,但目前還并未實(shí)現(xiàn)。所以目前默認(rèn)情況下一條消息從 producer 到 broker 是確保了 At least once,可通過(guò)設(shè)置 producer 異步發(fā)送實(shí)現(xiàn)At most once。

     

    四、broker 保存消息

    4.1 存儲(chǔ)方式

    物理上把 topic 分成一個(gè)或多個(gè) patition(對(duì)應(yīng) server.properties 中的 num.partitions=3 配置),每個(gè) patition 物理上對(duì)應(yīng)一個(gè)文件夾(該文件夾存儲(chǔ)該 patition 的所有消息和索引文件),如下:

     

    圖.4

    4.2 存儲(chǔ)策略

    無(wú)論消息是否被消費(fèi),kafka 都會(huì)保留所有消息。有兩種策略可以刪除舊數(shù)據(jù):

    1. 基于時(shí)間:log.retention.hours=168 2. 基于大小:log.retention.bytes=1073741824

    需要注意的是,因?yàn)镵afka讀取特定消息的時(shí)間復(fù)雜度為O(1),即與文件大小無(wú)關(guān),所以這里刪除過(guò)期文件與提高 Kafka 性能無(wú)關(guān)。

    4.3 topic 創(chuàng)建與刪除

    4.3.1 創(chuàng)建 topic

    創(chuàng)建 topic 的序列圖如下所示:

    圖.5

    流程說(shuō)明:

    復(fù)制代碼
    1. controller 在 ZooKeeper 的 /brokers/topics 節(jié)點(diǎn)上注冊(cè) watcher,當(dāng) topic 被創(chuàng)建,則 controller 會(huì)通過(guò) watch 得到該 topic 的 partition/replica 分配。 2. controller從 /brokers/ids 讀取當(dāng)前所有可用的 broker 列表,對(duì)于 set_p 中的每一個(gè) partition: 	2.1 從分配給該 partition 的所有 replica(稱(chēng)為AR)中任選一個(gè)可用的 broker 作為新的 leader,并將AR設(shè)置為新的 ISR 	2.2 將新的 leader 和 ISR 寫(xiě)入 /brokers/topics/[topic]/partitions/[partition]/state 3. controller 通過(guò) RPC 向相關(guān)的 broker 發(fā)送 LeaderAndISRRequest。
    復(fù)制代碼

    4.3.2 刪除 topic

    刪除 topic 的序列圖如下所示:

    圖.6

    流程說(shuō)明:

    1. controller 在 zooKeeper 的 /brokers/topics 節(jié)點(diǎn)上注冊(cè) watcher,當(dāng) topic 被刪除,則 controller 會(huì)通過(guò) watch 得到該 topic 的 partition/replica 分配。 2. 若 delete.topic.enable=false,結(jié)束;否則 controller 注冊(cè)在 /admin/delete_topics 上的 watch 被 fire,controller 通過(guò)回調(diào)向?qū)?yīng)的 broker 發(fā)送 StopReplicaRequest。

     

    五、kafka HA

    5.1 replication

    如圖.1所示,同一個(gè) partition 可能會(huì)有多個(gè) replica(對(duì)應(yīng) server.properties 配置中的 default.replication.factor=N)。沒(méi)有 replica 的情況下,一旦 broker 宕機(jī),其上所有 patition 的數(shù)據(jù)都不可被消費(fèi),同時(shí) producer 也不能再將數(shù)據(jù)存于其上的 patition。引入replication 之后,同一個(gè) partition 可能會(huì)有多個(gè) replica,而這時(shí)需要在這些 replica 之間選出一個(gè) leader,producer 和 consumer 只與這個(gè) leader 交互,其它 replica 作為 follower 從 leader 中復(fù)制數(shù)據(jù)。

    Kafka 分配 Replica 的算法如下:

    1. 將所有 broker(假設(shè)共 n 個(gè) broker)和待分配的 partition 排序 2. 將第 i 個(gè) partition 分配到第(i mod n)個(gè) broker 上 3. 將第 i 個(gè) partition 的第 j 個(gè) replica 分配到第((i + j) mode n)個(gè) broker上

    5.2 leader failover

    當(dāng) partition 對(duì)應(yīng)的 leader 宕機(jī)時(shí),需要從 follower 中選舉出新 leader。在選舉新leader時(shí),一個(gè)基本的原則是,新的 leader 必須擁有舊 leader commit 過(guò)的所有消息。

    kafka 在 zookeeper 中(/brokers/.../state)動(dòng)態(tài)維護(hù)了一個(gè) ISR(in-sync replicas),由3.3節(jié)的寫(xiě)入流程可知 ISR 里面的所有 replica 都跟上了 leader,只有 ISR 里面的成員才能選為 leader。對(duì)于 f+1 個(gè) replica,一個(gè) partition 可以在容忍 f 個(gè) replica 失效的情況下保證消息不丟失。

    當(dāng)所有 replica 都不工作時(shí),有兩種可行的方案:

    1. 等待 ISR 中的任一個(gè) replica 活過(guò)來(lái),并選它作為 leader。可保障數(shù)據(jù)不丟失,但時(shí)間可能相對(duì)較長(zhǎng)。 2. 選擇第一個(gè)活過(guò)來(lái)的 replica(不一定是 ISR 成員)作為 leader。無(wú)法保障數(shù)據(jù)不丟失,但相對(duì)不可用時(shí)間較短。

    kafka 0.8.* 使用第二種方式。

    kafka 通過(guò) Controller 來(lái)選舉 leader,流程請(qǐng)參考5.3節(jié)。

    5.3 broker failover

    kafka broker failover 序列圖如下所示:

    圖.7

    流程說(shuō)明: 

    復(fù)制代碼
    1. controller 在 zookeeper 的 /brokers/ids/[brokerId] 節(jié)點(diǎn)注冊(cè) Watcher,當(dāng) broker 宕機(jī)時(shí) zookeeper 會(huì) fire watch 2. controller 從 /brokers/ids 節(jié)點(diǎn)讀取可用broker 3. controller決定set_p,該集合包含宕機(jī) broker 上的所有 partition 4. 對(duì) set_p 中的每一個(gè) partition     4.1 從/brokers/topics/[topic]/partitions/[partition]/state 節(jié)點(diǎn)讀取 ISR     4.2 決定新 leader(如4.3節(jié)所描述)     4.3 將新 leader、ISR、controller_epoch 和 leader_epoch 等信息寫(xiě)入 state 節(jié)點(diǎn) 5. 通過(guò) RPC 向相關(guān) broker 發(fā)送 leaderAndISRRequest 命令
    復(fù)制代碼

    5.4 controller failover

     當(dāng) controller 宕機(jī)時(shí)會(huì)觸發(fā) controller failover。每個(gè) broker 都會(huì)在 zookeeper 的 "/controller" 節(jié)點(diǎn)注冊(cè) watcher,當(dāng) controller 宕機(jī)時(shí) zookeeper 中的臨時(shí)節(jié)點(diǎn)消失,所有存活的 broker 收到 fire 的通知,每個(gè) broker 都嘗試創(chuàng)建新的 controller path,只有一個(gè)競(jìng)選成功并當(dāng)選為 controller。

    當(dāng)新的 controller 當(dāng)選時(shí),會(huì)觸發(fā) KafkaController.onControllerFailover 方法,在該方法中完成如下操作:

    復(fù)制代碼
    1. 讀取并增加 Controller Epoch。 2. 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注冊(cè) watcher。 3. 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注冊(cè) watcher。 4. 通過(guò) partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注冊(cè) watcher。 5. 若 delete.topic.enable=true(默認(rèn)值是 false),則 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注冊(cè) watcher。 6. 通過(guò) replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注冊(cè)Watch。 7. 初始化 ControllerContext 對(duì)象,設(shè)置當(dāng)前所有 topic,“活”著的 broker 列表,所有 partition 的 leader 及 ISR等。 8. 啟動(dòng) replicaStateMachine 和 partitionStateMachine。 9. 將 brokerState 狀態(tài)設(shè)置為 RunningAsController。 10. 將每個(gè) partition 的 Leadership 信息發(fā)送給所有“活”著的 broker。 11. 若 auto.leader.rebalance.enable=true(默認(rèn)值是true),則啟動(dòng) partition-rebalance 線程。 12. 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,則刪除相應(yīng)的Topic。
    復(fù)制代碼

     

    6. consumer 消費(fèi)消息

    6.1 consumer API

    kafka 提供了兩套 consumer API:

    1. The high-level Consumer API 2. The SimpleConsumer API

     其中 high-level consumer API 提供了一個(gè)從 kafka 消費(fèi)數(shù)據(jù)的高層抽象,而 SimpleConsumer API 則需要開(kāi)發(fā)人員更多地關(guān)注細(xì)節(jié)。

    6.1.1 The high-level consumer API

    high-level consumer API 提供了 consumer group 的語(yǔ)義,一個(gè)消息只能被 group 內(nèi)的一個(gè) consumer 所消費(fèi),且 consumer 消費(fèi)消息時(shí)不關(guān)注 offset,最后一個(gè) offset 由 zookeeper 保存。

    使用 high-level consumer API 可以是多線程的應(yīng)用,應(yīng)當(dāng)注意:

    1. 如果消費(fèi)線程大于 patition 數(shù)量,則有些線程將收不到消息 2. 如果 patition 數(shù)量大于線程數(shù),則有些線程多收到多個(gè) patition 的消息 3. 如果一個(gè)線程消費(fèi)多個(gè) patition,則無(wú)法保證你收到的消息的順序,而一個(gè) patition 內(nèi)的消息是有序的

    6.1.2 The SimpleConsumer API

    如果你想要對(duì) patition 有更多的控制權(quán),那就應(yīng)該使用 SimpleConsumer API,比如:

    1. 多次讀取一個(gè)消息 2. 只消費(fèi)一個(gè) patition 中的部分消息 3. 使用事務(wù)來(lái)保證一個(gè)消息僅被消費(fèi)一次

     但是使用此 API 時(shí),partition、offset、broker、leader 等對(duì)你不再透明,需要自己去管理。你需要做大量的額外工作:

    1. 必須在應(yīng)用程序中跟蹤 offset,從而確定下一條應(yīng)該消費(fèi)哪條消息 2. 應(yīng)用程序需要通過(guò)程序獲知每個(gè) Partition 的 leader 是誰(shuí) 3. 需要處理 leader 的變更

     使用 SimpleConsumer API 的一般流程如下:

    復(fù)制代碼
    1. 查找到一個(gè)“活著”的 broker,并且找出每個(gè) partition 的 leader 2. 找出每個(gè) partition 的 follower 3. 定義好請(qǐng)求,該請(qǐng)求應(yīng)該能描述應(yīng)用程序需要哪些數(shù)據(jù) 4. fetch 數(shù)據(jù) 5. 識(shí)別 leader 的變化,并對(duì)之作出必要的響應(yīng)
    復(fù)制代碼

    以下針對(duì) high-level Consumer API 進(jìn)行說(shuō)明。

    6.2 consumer group

    如 2.2 節(jié)所說(shuō), kafka 的分配單位是 patition。每個(gè) consumer 都屬于一個(gè) group,一個(gè) partition 只能被同一個(gè) group 內(nèi)的一個(gè) consumer 所消費(fèi)(也就保障了一個(gè)消息只能被 group 內(nèi)的一個(gè) consuemr 所消費(fèi)),但是多個(gè) group 可以同時(shí)消費(fèi)這個(gè) partition。

    kafka 的設(shè)計(jì)目標(biāo)之一就是同時(shí)實(shí)現(xiàn)離線處理和實(shí)時(shí)處理,根據(jù)這一特性,可以使用 spark/Storm 這些實(shí)時(shí)處理系統(tǒng)對(duì)消息在線處理,同時(shí)使用 Hadoop 批處理系統(tǒng)進(jìn)行離線處理,還可以將數(shù)據(jù)備份到另一個(gè)數(shù)據(jù)中心,只需要保證這三者屬于不同的 consumer group。如下圖所示:

     

    圖.8

    6.3 消費(fèi)方式

    consumer 采用 pull 模式從 broker 中讀取數(shù)據(jù)。

    push 模式很難適應(yīng)消費(fèi)速率不同的消費(fèi)者,因?yàn)橄l(fā)送速率是由 broker 決定的。它的目標(biāo)是盡可能以最快速度傳遞消息,但是這樣很容易造成 consumer 來(lái)不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而 pull 模式則可以根據(jù) consumer 的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息。

    對(duì)于 Kafka 而言,pull 模式更合適,它可簡(jiǎn)化 broker 的設(shè)計(jì),consumer 可自主控制消費(fèi)消息的速率,同時(shí) consumer 可以自己控制消費(fèi)方式——即可批量消費(fèi)也可逐條消費(fèi),同時(shí)還能選擇不同的提交方式從而實(shí)現(xiàn)不同的傳輸語(yǔ)義。

    6.4 consumer delivery guarantee

    如果將 consumer 設(shè)置為 autocommit,consumer 一旦讀到數(shù)據(jù)立即自動(dòng) commit。如果只討論這一讀取消息的過(guò)程,那 Kafka 確保了 Exactly once。

    但實(shí)際使用中應(yīng)用程序并非在 consumer 讀取完數(shù)據(jù)就結(jié)束了,而是要進(jìn)行進(jìn)一步處理,而數(shù)據(jù)處理與 commit 的順序在很大程度上決定了consumer delivery guarantee:

    復(fù)制代碼
    1.讀完消息先 commit 再處理消息。     這種模式下,如果 consumer 在 commit 后還沒(méi)來(lái)得及處理消息就 crash 了,下次重新開(kāi)始工作后就無(wú)法讀到剛剛已提交而未處理的消息,這就對(duì)應(yīng)于 At most once 2.讀完消息先處理再 commit。     這種模式下,如果在處理完消息之后 commit 之前 consumer crash 了,下次重新開(kāi)始工作時(shí)還會(huì)處理剛剛未 commit 的消息,實(shí)際上該消息已經(jīng)被處理過(guò)了。這就對(duì)應(yīng)于 At least once。 3.如果一定要做到 Exactly once,就需要協(xié)調(diào) offset 和實(shí)際操作的輸出。     精典的做法是引入兩階段提交。如果能讓 offset 和操作輸入存在同一個(gè)地方,會(huì)更簡(jiǎn)潔和通用。這種方式可能更好,因?yàn)樵S多輸出系統(tǒng)可能不支持兩階段提交。比如,consumer 拿到數(shù)據(jù)后可能把數(shù)據(jù)放到 HDFS,如果把最新的 offset 和數(shù)據(jù)本身一起寫(xiě)到 HDFS,那就可以保證數(shù)據(jù)的輸出和 offset 的更新要么都完成,要么都不完成,間接實(shí)現(xiàn) Exactly once。(目前就 high-level API而言,offset 是存于Zookeeper 中的,無(wú)法存于HDFS,而SimpleConsuemr API的 offset 是由自己去維護(hù)的,可以將之存于 HDFS 中)
    復(fù)制代碼

    總之,Kafka 默認(rèn)保證 At least once,并且允許通過(guò)設(shè)置 producer 異步提交來(lái)實(shí)現(xiàn) At most once(見(jiàn)文章《kafka consumer防止數(shù)據(jù)丟失》)。而 Exactly once 要求與外部存儲(chǔ)系統(tǒng)協(xié)作,幸運(yùn)的是 kafka 提供的 offset 可以非常直接非常容易得使用這種方式。

    更多關(guān)于 kafka 傳輸語(yǔ)義的信息請(qǐng)參考《Message Delivery Semantics》。

    6.5 consumer rebalance

    當(dāng)有 consumer 加入或退出、以及 partition 的改變(如 broker 加入或退出)時(shí)會(huì)觸發(fā) rebalance。consumer rebalance算法如下:

    復(fù)制代碼
    1. 將目標(biāo) topic 下的所有 partirtion 排序,存于PT 2. 對(duì)某 consumer group 下所有 consumer 排序,存于 CG,第 i 個(gè)consumer 記為 Ci 3. N=size(PT)/size(CG),向上取整 4. 解除 Ci 對(duì)原來(lái)分配的 partition 的消費(fèi)權(quán)(i從0開(kāi)始) 5. 將第i*N到(i+1)*N-1個(gè) partition 分配給 Ci
    復(fù)制代碼

    在 0.8.*版本,每個(gè) consumer 都只負(fù)責(zé)調(diào)整自己所消費(fèi)的 partition,為了保證整個(gè)consumer group 的一致性,當(dāng)一個(gè) consumer 觸發(fā)了 rebalance 時(shí),該 consumer group 內(nèi)的其它所有其它 consumer 也應(yīng)該同時(shí)觸發(fā) rebalance。這會(huì)導(dǎo)致以下幾個(gè)問(wèn)題:

    復(fù)制代碼
    1.Herd effect   任何 broker 或者 consumer 的增減都會(huì)觸發(fā)所有的 consumer 的 rebalance 2.Split Brain   每個(gè) consumer 分別單獨(dú)通過(guò) zookeeper 判斷哪些 broker 和 consumer 宕機(jī)了,那么不同 consumer 在同一時(shí)刻從 zookeeper 看到的 view 就可能不一樣,這是由 zookeeper 的特性決定的,這就會(huì)造成不正確的 reblance 嘗試。 3. 調(diào)整結(jié)果不可控   所有的 consumer 都并不知道其它 consumer 的 rebalance 是否成功,這可能會(huì)導(dǎo)致 kafka 工作在一個(gè)不正確的狀態(tài)。
    復(fù)制代碼

    基于以上問(wèn)題,kafka 設(shè)計(jì)者考慮在0.9.*版本開(kāi)始使用中心 coordinator 來(lái)控制 consumer rebalance,然后又從簡(jiǎn)便性和驗(yàn)證要求兩方面考慮,計(jì)劃在 consumer 客戶(hù)端實(shí)現(xiàn)分配方案。(見(jiàn)文章《Kafka Detailed Consumer Coordinator Design》和《Kafka Client-side Assignment Proposal》),此處不再贅述。

     

    七、注意事項(xiàng)

    7.1 producer 無(wú)法發(fā)送消息的問(wèn)題

    最開(kāi)始在本機(jī)搭建了kafka偽集群,本地 producer 客戶(hù)端成功發(fā)布消息至 broker。隨后在服務(wù)器上搭建了 kafka 集群,在本機(jī)連接該集群,producer 卻無(wú)法發(fā)布消息到 broker(奇怪也沒(méi)有拋錯(cuò))。最開(kāi)始懷疑是 iptables 沒(méi)開(kāi)放,于是開(kāi)放端口,結(jié)果還不行(又開(kāi)始是代碼問(wèn)題、版本問(wèn)題等等,倒騰了很久)。最后沒(méi)辦法,一項(xiàng)一項(xiàng)查看 server.properties 配置,發(fā)現(xiàn)以下兩個(gè)配置:

    復(fù)制代碼
    # The address the socket server listens on. It will get the value returned from  # java.net.InetAddress.getCanonicalHostName() if not configured. #   FORMAT: #     listeners = security_protocol://host_name:port #   EXAMPLE: #     listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://:9092

     # Hostname and port the broker will advertise to producers and consumers. If not set, 
     # it uses the value for "listeners" if configured. Otherwise, it will use the value
     # returned from java.net.InetAddress.getCanonicalHostName().
     #advertised.listeners=PLAINTEXT://your.host.name:9092

    復(fù)制代碼

    以上說(shuō)的就是 advertised.listeners 是 broker 給 producer 和 consumer 連接使用的,如果沒(méi)有設(shè)置,就使用 listeners,而如果 host_name 沒(méi)有設(shè)置的話,就使用 java.net.InetAddress.getCanonicalHostName() 方法返回的主機(jī)名。

    修改方法:

    1. listeners=PLAINTEXT://121.10.26.XXX:9092 2. advertised.listeners=PLAINTEXT://121.10.26.XXX:9092

    修改后重啟服務(wù),正常工作。關(guān)于更多 kafka 配置說(shuō)明,見(jiàn)文章《Kafka學(xué)習(xí)整理三(borker(0.9.0及0.10.0)配置)》。

     

    八、參考文章

    1. 《Kafka剖析(一):Kafka背景及架構(gòu)介紹

    2. 《Kafka設(shè)計(jì)解析(二):Kafka High Availability (上)

    3. 《Kafka設(shè)計(jì)解析(二):Kafka High Availability (下)

    4. 《Kafka設(shè)計(jì)解析(四):Kafka Consumer解析

    5. 《Kafka設(shè)計(jì)解析(五):Kafka Benchmark

    6. 《Kafka學(xué)習(xí)整理三(borker(0.9.0及0.10.0)配置)

    7. 《Using the High Level Consumer

    8. 《Using SimpleConsumer

    9. 《Consumer Client Re-Design

    10. 《Message Delivery Semantics

    11. 《Kafka Detailed Consumer Coordinator Design

    12. 《Kafka Client-side Assignment Proposal

    13. 《Kafka和DistributedLog技術(shù)對(duì)比

    14. 《kafka安裝和啟動(dòng)

    15. 《kafka consumer防止數(shù)據(jù)丟失

      

     

    作者:cyfonly
    本文版權(quán)歸作者和博客園共有,歡迎轉(zhuǎn)載,未經(jīng)同意須保留此段聲明,且在文章頁(yè)面明顯位置給出原文連接。歡迎指正與交流。
    posted on 2017-04-28 10:37 xzc 閱讀(318) 評(píng)論(0)  編輯  收藏 所屬分類(lèi): hadoop
    主站蜘蛛池模板: 久久亚洲私人国产精品| 国产午夜不卡AV免费| 在线观看亚洲AV每日更新无码 | 女bbbbxxxx另类亚洲| 涩涩色中文综合亚洲| 亚洲一卡2卡三卡4卡无卡下载 | 成全视频免费高清 | 最近中文字幕无吗高清免费视频| 一级做a爰片久久免费| 日韩在线视频线视频免费网站| 亚洲人妖女同在线播放| 91亚洲视频在线观看| 亚洲乱码在线观看| 内射干少妇亚洲69XXX| 亚洲综合无码AV一区二区| 亚洲欧洲日产国码无码久久99| 国产精品va无码免费麻豆| 999国内精品永久免费视频| 99re热免费精品视频观看| 免费高清av一区二区三区| 国产va免费精品| 亚洲国产一区在线观看| 激情综合亚洲色婷婷五月| 4444亚洲国产成人精品| 亚洲一区电影在线观看| 亚洲6080yy久久无码产自国产| 亚洲三级中文字幕| 亚洲毛片免费视频| 亚洲成av人无码亚洲成av人| ww亚洲ww在线观看国产| 亚洲毛片无码专区亚洲乱| 亚洲熟女综合色一区二区三区| 亚洲国产夜色在线观看| 亚洲黄色一级毛片| 亚洲国产精品久久久久秋霞小| 2020天堂在线亚洲精品专区| 美女啪啪网站又黄又免费| 在线观看免费无码专区| 久久WWW色情成人免费观看| 亚洲国产精品视频| 亚洲高清视频免费|