Posted on 2018-11-30 16:05
為自己代言 閱讀(4204)
評論(0) 編輯 收藏 所屬分類:
消息中間件
以前一直沒有接觸過kafka 消息中間件,現在公司要用它來做消息服務(sub/pub),安裝都不多說了 主要是開發的時候遇到問題和解決方法:
版本: zookeeper-3.4.12.tar.gz kafka_2.12-2.1.0.tgz 連接工具: kafkatool_64bit.exe 集成: spring boot
pom.xml:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
程序就集成:
@Bean
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE, proxyMode = ScopedProxyMode.TARGET_CLASS)
public KafkaConsumer<String, String> getKafkaConsumer() {
return new KafkaConsumer<String, String>(consumerConfigs());
}
問題就在這里 KafkaConsumer 是讓spring IOC來管理,剛剛開始只有@Bean 生成的對象實例就只有一個,但是在啟動線程消息的時候只能一個對象一個線程,如果一個對象在啟用線程去消費會報
KafkaConsumer is not safe for multi-threaded access
解決辦法:
1.線程與KafkaConsumer對象實例的對應關系是1:1
2.要保證線程與KafkaConsumer對象的關系是固定不變的,也就是說,一個線程始終都只能操作同一個KafkaConsumer對象且一個KafkaConsumer對象始終是由同一個線程來操作的 所以在 @Bean 又加了 @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE, proxyMode = ScopedProxyMode.TARGET_CLASS) 來每一次用生成一個新實例對象
2:問題
線程與KafkaConsumer對象實例的對應關系是1:1 ,但訂閱的對對象 和線程使用poll KafkaConsumer 對象又會發生變化導致監聽消費報錯
Consumer is not subscribed to any topics or assigned any partitions,為什么會報沒有定閱呢,明明已經定閱了
解決辦法不要讓spring IOC 來管理
KafkaConsumer 生成實例對象 使用new 方式生成。
看來了解下原理是很重要的以下是比較不錯的文章(里邊還有多線程消費源碼和原理講解)