kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),她有如下特性:
通過(guò)O(1)的磁盤數(shù)據(jù)結(jié)構(gòu)提供消息的持久化,這種結(jié)構(gòu)對(duì)于即使數(shù)以TB的消息存儲(chǔ)也能夠保持長(zhǎng)時(shí)間的穩(wěn)定性能。
高吞吐量:即使是非常普通的硬件kafka也可以支持每秒數(shù)十萬(wàn)的消息。
支持通過(guò)kafka服務(wù)器和消費(fèi)機(jī)集群來(lái)分區(qū)消息。
支持Hadoop并行數(shù)據(jù)加載。
設(shè)計(jì)側(cè)重高吞吐量,用于好友動(dòng)態(tài),相關(guān)性統(tǒng)計(jì),排行統(tǒng)計(jì),訪問(wèn)頻率控制,批處理等系統(tǒng)。大部分的消息中間件能夠處理實(shí)時(shí)性要求高的消息/數(shù)據(jù),但是對(duì)于隊(duì)列中大量未處理的消息/數(shù)據(jù)在持久性方面比較弱。
kakfa的consumer使用拉的方式工作。
安裝kafka下載:http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz
> tar xzf kafka-.tgz
> cd kafka-
> ./sbt update
> ./sbt package
啟動(dòng)zkserver:
bin/zookeeper-server-start.sh config/zookeeper.properties
啟動(dòng)server:
bin/kafka-server-start.sh config/server.properties
就是這么簡(jiǎn)單。
使用kafka
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import kafka.javaapi.producer.SyncProducer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.producer.SyncProducerConfig;
Properties props =
new Properties();
props.put(“zk.connect”, “127.0.0.1:2181”);
props.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig config =
new ProducerConfig(props);
Producer<String, String> producer =
new Producer<String, String>(config);
Send a single message
// The message is sent to a randomly selected partition registered in ZK
ProducerData<String, String> data =
new ProducerData<String, String>("test-topic", "test-message");
producer.send(data);
producer.close();
這樣就是一個(gè)標(biāo)準(zhǔn)的producer。
consumer的代碼
// specify some consumer properties
Properties props = new Properties();
props.put("zk.connect", "localhost:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "test_group");
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
Map<String, List<KafkaMessageStream<Message>>> topicMessageStreams =
consumerConnector.createMessageStreams(ImmutableMap.of("test", 4));
List<KafkaMessageStream<Message>> streams = topicMessageStreams.get("test");
// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(4);
// consume the messages in the threads
for(final KafkaMessageStream<Message> stream: streams) {
executor.submit(new Runnable() {
public void run() {
for(Message message: stream) {
// process message
}
}
});
}