<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    paulwong

    linkedin高吞吐量分布式消息系統(tǒng)kafka使用手記

    kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),她有如下特性:

    通過(guò)O(1)的磁盤數(shù)據(jù)結(jié)構(gòu)提供消息的持久化,這種結(jié)構(gòu)對(duì)于即使數(shù)以TB的消息存儲(chǔ)也能夠保持長(zhǎng)時(shí)間的穩(wěn)定性能。
    高吞吐量:即使是非常普通的硬件kafka也可以支持每秒數(shù)十萬(wàn)的消息。
    支持通過(guò)kafka服務(wù)器和消費(fèi)機(jī)集群來(lái)分區(qū)消息。
    支持Hadoop并行數(shù)據(jù)加載。

    設(shè)計(jì)側(cè)重高吞吐量,用于好友動(dòng)態(tài),相關(guān)性統(tǒng)計(jì),排行統(tǒng)計(jì),訪問(wèn)頻率控制,批處理等系統(tǒng)。大部分的消息中間件能夠處理實(shí)時(shí)性要求高的消息/數(shù)據(jù),但是對(duì)于隊(duì)列中大量未處理的消息/數(shù)據(jù)在持久性方面比較弱。

    kakfa的consumer使用拉的方式工作。


    安裝kafka
    下載:http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz

    > tar xzf kafka-.tgz
    > cd kafka-
    > ./sbt update
    > ./sbt package
    啟動(dòng)zkserver:
    bin/zookeeper-server-start.sh config/zookeeper.properties
    啟動(dòng)server:
    bin/kafka-server-start.sh config/server.properties
    就是這么簡(jiǎn)單。


    使用kafka
    import java.util.Arrays;  
    import java.util.List;  
    import java.util.Properties;  
    import kafka.javaapi.producer.SyncProducer;  
    import kafka.javaapi.message.ByteBufferMessageSet;  
    import kafka.message.Message;  
    import kafka.producer.SyncProducerConfig;  
      
      
      
    Properties props = new Properties();  
    props.put(“zk.connect”, “127.0.0.1:2181”);  
    props.put("serializer.class", "kafka.serializer.StringEncoder");  
    ProducerConfig config = new ProducerConfig(props);  
    Producer<String, String> producer = new Producer<String, String>(config);  
      
    Send a single message  
      
    // The message is sent to a randomly selected partition registered in ZK  
    ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-message");  
    producer.send(data);      
      
    producer.close();  


    這樣就是一個(gè)標(biāo)準(zhǔn)的producer。

    consumer的代碼
    // specify some consumer properties  
    Properties props = new Properties();  
    props.put("zk.connect", "localhost:2181");  
    props.put("zk.connectiontimeout.ms", "1000000");  
    props.put("groupid", "test_group");  
      
    // Create the connection to the cluster  
    ConsumerConfig consumerConfig = new ConsumerConfig(props);  
    ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);  
      
    // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume  
    Map<String, List<KafkaMessageStream<Message>>> topicMessageStreams =   
        consumerConnector.createMessageStreams(ImmutableMap.of("test", 4));  
    List<KafkaMessageStream<Message>> streams = topicMessageStreams.get("test");  
      
    // create list of 4 threads to consume from each of the partitions   
    ExecutorService executor = Executors.newFixedThreadPool(4);  
      
    // consume the messages in the threads  
    for(final KafkaMessageStream<Message> stream: streams) {  
      executor.submit(new Runnable() {  
        public void run() {  
          for(Message message: stream) {  
            // process message  
          }   
        }  
      });  
    }  





    posted on 2013-09-08 17:32 paulwong 閱讀(568) 評(píng)論(0)  編輯  收藏 所屬分類: LOG ANALYST BIG DATA SYSTEM

    主站蜘蛛池模板: 国产成人精品亚洲一区| 在线电影你懂的亚洲| 亚洲乱码精品久久久久..| 久久久久亚洲AV无码专区首| 久久精品国产亚洲AV无码娇色| 亚洲中文字幕久久精品无码2021 | 最新69国产成人精品免费视频动漫 | 久久久久久久久久免免费精品| 男女作爱免费网站| 精品免费tv久久久久久久| 久久精品免费电影| 九九九精品成人免费视频| 日韩午夜免费视频| 亚洲欧洲日产国码av系列天堂| 亚洲国产高清视频| 一本色道久久88—综合亚洲精品| 亚洲精品无码专区在线播放| 免费人成网站永久| 香蕉免费看一区二区三区| 中文字幕乱码免费视频| 高清在线亚洲精品国产二区| 亚洲成AV人片在线观看无| 亚洲最新在线视频| 视频免费1区二区三区| 久久精品免费电影| 国产中文字幕免费| 久久精品国产亚洲网站| 亚洲 日韩经典 中文字幕 | www免费插插视频| 18禁黄网站禁片免费观看不卡| 亚洲一级毛片在线观| 男人的天堂亚洲一区二区三区| 区三区激情福利综合中文字幕在线一区亚洲视频1 | 亚洲视频免费观看| 成人免费午夜视频| 亚洲AV永久无码区成人网站| 亚洲 欧洲 视频 伦小说| 巨胸喷奶水www永久免费| 免费视频爱爱太爽了| 亚洲国产成人影院播放| 亚洲国产精品久久人人爱|