<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    paulwong

    linkedin高吞吐量分布式消息系統kafka使用手記

    kafka是一種高吞吐量的分布式發布訂閱消息系統,她有如下特性:

    通過O(1)的磁盤數據結構提供消息的持久化,這種結構對于即使數以TB的消息存儲也能夠保持長時間的穩定性能。
    高吞吐量:即使是非常普通的硬件kafka也可以支持每秒數十萬的消息。
    支持通過kafka服務器和消費機集群來分區消息。
    支持Hadoop并行數據加載。

    設計側重高吞吐量,用于好友動態,相關性統計,排行統計,訪問頻率控制,批處理等系統。大部分的消息中間件能夠處理實時性要求高的消息/數據,但是對于隊列中大量未處理的消息/數據在持久性方面比較弱。

    kakfa的consumer使用拉的方式工作。


    安裝kafka
    下載:http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz

    > tar xzf kafka-.tgz
    > cd kafka-
    > ./sbt update
    > ./sbt package
    啟動zkserver:
    bin/zookeeper-server-start.sh config/zookeeper.properties
    啟動server:
    bin/kafka-server-start.sh config/server.properties
    就是這么簡單。


    使用kafka
    import java.util.Arrays;  
    import java.util.List;  
    import java.util.Properties;  
    import kafka.javaapi.producer.SyncProducer;  
    import kafka.javaapi.message.ByteBufferMessageSet;  
    import kafka.message.Message;  
    import kafka.producer.SyncProducerConfig;  
      
      
      
    Properties props = new Properties();  
    props.put(“zk.connect”, “127.0.0.1:2181”);  
    props.put("serializer.class", "kafka.serializer.StringEncoder");  
    ProducerConfig config = new ProducerConfig(props);  
    Producer<String, String> producer = new Producer<String, String>(config);  
      
    Send a single message  
      
    // The message is sent to a randomly selected partition registered in ZK  
    ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-message");  
    producer.send(data);      
      
    producer.close();  


    這樣就是一個標準的producer。

    consumer的代碼
    // specify some consumer properties  
    Properties props = new Properties();  
    props.put("zk.connect", "localhost:2181");  
    props.put("zk.connectiontimeout.ms", "1000000");  
    props.put("groupid", "test_group");  
      
    // Create the connection to the cluster  
    ConsumerConfig consumerConfig = new ConsumerConfig(props);  
    ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);  
      
    // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume  
    Map<String, List<KafkaMessageStream<Message>>> topicMessageStreams =   
        consumerConnector.createMessageStreams(ImmutableMap.of("test", 4));  
    List<KafkaMessageStream<Message>> streams = topicMessageStreams.get("test");  
      
    // create list of 4 threads to consume from each of the partitions   
    ExecutorService executor = Executors.newFixedThreadPool(4);  
      
    // consume the messages in the threads  
    for(final KafkaMessageStream<Message> stream: streams) {  
      executor.submit(new Runnable() {  
        public void run() {  
          for(Message message: stream) {  
            // process message  
          }   
        }  
      });  
    }  





    posted on 2013-09-08 17:32 paulwong 閱讀(568) 評論(0)  編輯  收藏 所屬分類: LOG ANALYST BIG DATA SYSTEM

    主站蜘蛛池模板: 精品亚洲av无码一区二区柚蜜| 久久精品国产亚洲精品| www视频在线观看免费| 十九岁在线观看免费完整版电影| 91免费在线视频| 四虎国产精品免费永久在线| 国产日韩AV免费无码一区二区| 国产精品极品美女自在线观看免费 | 黄色毛片视频免费| 国产午夜亚洲精品不卡| 免费在线观看自拍性爱视频| 一本久久免费视频| 99在线免费视频| 国产成年无码久久久免费| 国产99视频精品免费专区| 四虎影视成人永久免费观看视频 | 在线a亚洲老鸭窝天堂av高清| 色天使亚洲综合在线观看| 亚洲乱码日产精品一二三| 黑人粗长大战亚洲女2021国产精品成人免费视频| 亚洲AV无码成人网站在线观看| 精品免费AV一区二区三区| 一个人晚上在线观看的免费视频| 中文字幕免费在线看| 99久久免费中文字幕精品| 精品久久久久久久久免费影院| 热99re久久精品精品免费| 亚洲AV无码成人精品区大在线| 亚洲午夜无码久久久久| 日产亚洲一区二区三区| 涩涩色中文综合亚洲| 日韩a毛片免费观看| 免费一级不卡毛片| 欧美a级成人网站免费| 国产在线19禁免费观看| 亚洲色偷拍另类无码专区| 337p欧洲亚洲大胆艺术| 亚洲暴爽av人人爽日日碰| 久久免费观看视频| 久久久精品2019免费观看| 香蕉高清免费永久在线视频|