<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    xylz,imxylz

    關注后端架構、中間件、分布式和并發編程

       :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理 ::
      111 隨筆 :: 10 文章 :: 2680 評論 :: 0 Trackbacks
    Jafka 是一個開源的/性能良好的分布式消息系統。在上一篇文章中有所簡單介紹。
    下面是一篇簡單的入門文檔。更多詳細的文檔參考wiki。

    Step 1: 下載最新的安裝包

    完整的安裝指南在這里。
    最新的發行版地址在:https://github.com/adyliu/jafka/downloads

    $wget https://github.com/downloads/adyliu/jafka/jafka-1.0.tgz 
    $tar xzf jafka-1.0.tgz
    $cd jafka-1.0

    可選配置,設置一個環境變量。 $export $JAFKA_HOME=/opt/apps/jafka-1.0 以下假設所有操作目錄都在$JAFKA_HOME下。

    Step 2: 啟動服務端

    這里啟動一個單進程的服務端,使用默認的配置啟動即可。由于一些路徑使用了相對路徑,因此需要在jafka的主目錄下運行。

    $bash bin/server-single.sh config/server-single.properties 

    默認情況下,無需任何配置即可運行服務端。這時服務端會將9092端口綁定到所有網卡上。

    Step 3: 發送消息

    使用自帶的小命令行就可以發送簡單的文本消息。

    $bin/producer-console.sh --broker-list 0:localhost:9092 --topic demo 
    > Welcome to jafka
    > 中文中國

    producer-console.sh有一些參數,這可以通過執行下面的命令得到。 $bin/producer-console.sh

    發送消息只需要在提示符號'>'輸入文本即可,沒有出錯意味著發送成功,直接回車或者輸入CTRL+C退出程序。

    Step 4: 啟動消費者

    現在是時候消費剛才發送的消息。

    同樣Jafka自帶一個小程序能夠消費簡單的文本消息。

    $bin/simple-consumer-console.sh --topic demo --server jafka://localhost:9092 
    [1] 26: Welcome to jafka
    [2] 48: 中文中國

    連接上服務端后,立即就看到有消息消費了。默認情況下simple-consumer-console.sh輸出消息的序號(實際上不存在)以及消息的下一個偏移量(offset)。

    解壓縮后只需要執行上面三條命令就可以完成簡單的消息發送和接受演示。這就是一個簡單的消息系統。

    Step 5: 手動編碼

    我們希望利用提供的API手動編碼能夠發送和接受一些消息。

    消息發送者

    首先寫一個簡單的消息發送者。

    public static void main(String[] args) throws Exception {
        Properties props 
    = new Properties();
        props.put(
    "broker.list""0:127.0.0.1:9092");
        props.put(
    "serializer.class", StringEncoder.class.getName());
        
    //
        ProducerConfig config = new ProducerConfig(props);
        Producer
    <String, String> producer = new Producer<String, String>(config);
        
    //
        StringProducerData data = new StringProducerData("demo");
        
    for(int i=0;i<1000;i++) {
            data.add(
    "Hello world #"+i);
        }
        
    //
        try {
            
    long start = System.currentTimeMillis();
            
    for (int i = 0; i < 100; i++) {
                producer.send(data);
            }
            
    long cost = System.currentTimeMillis() - start;
            System.out.println(
    "send 100000 message cost: "+cost+" ms");
        } 
    finally {
            producer.close();
        }
    }

     

    看起來有點復雜,我們簡單分解下。

    配置參數

    首先需要配置服務端的地址。一個jfaka服務端地址格式如下:

    brokerId:host:port 
    • brokerId 用于標識服務進程,這在一個集群里面是全局唯一的
    • host/port 用戶描述服務監聽的ip地址和端口,默認情況下會在所有網卡的9092端口監聽數據。

    配置完服務端信息后,我們需要提供一個消息編碼。

    消息編碼用于將任意消息類型編碼成字節數組,這些字節數組就是我們的消息體。 

    默認情況下Jafka解析字節數組編碼,也就是原封不動的發送出去。這里簡單替換下,使用字符串UTF-8編碼。

    構造消息客戶端

    使用上面簡單的參數就可以構造出來一個簡單的消息發送客戶端。

    消息發送客戶端(Producer)用于管理與服務端之間的連接,并將消息按照指定的編碼方式發送給服務端。 

    構造消息

    用于使用字符串編碼,因此這里只能發送字符串的數據。每一個消息數據包都可以帶有多條消息,只需要滿足一個消息數據包的大小不超過默認的1M即可。比如下面就構造發往主題為demo的100條消息的數據包:

    StringProducerData data = new StringProducerData("demo");
    for(int i=0;i<1000;i++) {
    data.add("Hello world #"+i);
    }

    發送消息

    最后發送消息只需要調用producer.send()即可。上述例子中循環發送100次。

    下面是某次發送的結果:

    $bin/run-console.sh demo.client.StaticBrokerSender
    send 100000 message cost: 685 ms

    消息接受者

    接受消息的邏輯非常簡單,只需要配置服務端的地址,然后從偏移量0開始順序消費消息即可。

    下面的邏輯是簡單的將接受的消息以UTF-8的字符串展示。

    SimpleConsumer consumer = new SimpleConsumer("127.0.0.1", 9092);
    //
    long offset = 0;
    while (true) {
        FetchRequest request = new FetchRequest("test", 0, offset);
        for (MessageAndOffset msg : consumer.fetch(request)) {
            System.out.println(Utils.toString(msg.message.payload(), "UTF-8"));
            offset = msg.offset;
        }
    }


    整合ZooKeeper

    Jafka 使用zookeeper進行自動broker尋址以及消費者負載均衡。

    (1)啟動zookeeper服務

    測試時可以使用一個單進程的zookeeper用于替換zookeeper集群。

    $bin/zookeeper-server.sh config/zookeeper.properties 

    (2)啟動Jafka服務端

    $bin/server-single.sh config/server.properties 
    [2012-04-24 12:29:56,526] INFO Starting Jafka server (com.sohu.jafka.server.Server.java:68)
    [2012-04-24 12:29:56,532] INFO starting log cleaner every 60000 ms (com.sohu.jafka.log.LogManager.java:155)
    [2012-04-24 12:29:56,552] INFO connecting to zookeeper: 127.0.0.1:2181 (com.sohu.jafka.server.Zookeeper.java:80)
    [2012-04-24 12:29:56,568] INFO Starting ZkClient event thread. (com.github.zkclient.ZkEventThread.java:64)


    服務端啟動后自動向zookeeper注冊服務端的信息,例如ip地址、端口、已存在的消息等。

    (3)啟動消息發送者

    $bin/producer-console.sh --zookeeper localhost:2181 --topic demo
    Enter you message and exit with empty string.
    > Jafka second day
    > Jafka use zookeeper to search brokers and consumers                                       

    和上面啟動的消息發送者類似,只不過這里使用zookeeper配置自動尋找服務端,而不是指定服務端地址。

    (4)啟動消息接受者

    $bin/consumer-console.sh --zookeeper localhost:2181 --topic demo --from-beginning
    Jafka second day
    Jafka use zookeeper to search brokers and consumers

    這時候很快就看到剛才發送的消息了。

    由于使用zookeeper作為配置中心,因此可以啟動更多的服務端、消息發送者、消息接受者。只需要保證都連接zookeeper,并且所有的服務端都有唯一的brokerId(位于server.properties中).

    (5)API使用

    上面是使用自帶的程序發送簡單的文本消息。這里利用API來進行開發。

    發送消息

       public static void main(String[] args) throws Exception {
            Properties props = new Properties();
            props.put("zk.connect", "localhost:2181");
            props.put("serializer.class", StringEncoder.class.getName());
            //
            ProducerConfig config = new ProducerConfig(props);
            Producer<String, String> producer = new Producer<String, String>(config);
            //
            StringProducerData data = new StringProducerData("demo");
            for(int i=0;i<100;i++) {
                data.add("Hello world #"+i);
            }
            //
            try {
                long start = System.currentTimeMillis();
                for (int i = 0; i < 100; i++) {
                    producer.send(data);
                }
                long cost = System.currentTimeMillis() - start;
                System.out.println("send 10000 message cost: "+cost+" ms");
            } finally {
                producer.close();
            }
        }


    和不使用zookeeper的消息發送者對比,只需要將服務端配置信息替換成zookeeper連接地址即可。其它完全一致。

    接收消息

    接受消息看起來稍微有點復雜,簡單來說是如下幾步:

    • 配置zookeeper以及客戶端groupid
    • 與服務端的連接
    • 創建消息流
    • 啟動線程池消費消息


    public static void main(String[] args) throws Exception {

        Properties props 
    = new Properties();
        props.put(
    "zk.connect""localhost:2181");
        props.put(
    "groupid""test_group");
        
    //
        ConsumerConfig consumerConfig = new ConsumerConfig(props);
        ConsumerConnector connector 
    = Consumer.create(consumerConfig);
        
    //
        Map<String, List<MessageStream<String>>> topicMessageStreams = connector.createMessageStreams(ImmutableMap.of("demo"2), new StringDecoder());
        List
    <MessageStream<String>> streams = topicMessageStreams.get("demo");
        
    //
        ExecutorService executor = Executors.newFixedThreadPool(2);
        
    final AtomicInteger count = new AtomicInteger();
        
    for (final MessageStream<String> stream : streams) {
            executor.submit(
    new Runnable() {

                
    public void run() {
                    
    for (String message : stream) {
                        System.out.println(count.incrementAndGet() 
    + " => " + message);
                    }
                }
            });
        }
        
    //
        executor.awaitTermination(1, TimeUnit.HOURS);
    所有消息的消費方式幾乎都相同,只是消費的topic名稱不同而已。

     


    是不是很簡單,動手試試吧

    https://github.com/adyliu/jafka/wiki/quickstart.zh_CN


    ©2009-2014 IMXYLZ |求賢若渴
    posted on 2012-05-11 18:48 imxylz 閱讀(9760) 評論(4)  編輯  收藏 所屬分類: Jafka

    評論

    # re: 分布式消息系統jafka快速起步 2012-05-17 15:50 樂百事
    希望提交至maven中央倉庫  回復  更多評論
      

    # re: 分布式消息系統jafka快速起步 2012-05-17 18:51 imxylz
    @樂百事
    已經提交到Maven中央倉庫  回復  更多評論
      

    # re: 分布式消息系統jafka快速起步 2012-05-18 15:26 樂百事
    中央倉庫怎么搜不到啊;

    pom dependency?  回復  更多評論
      

    # re: 分布式消息系統jafka快速起步 2012-05-18 15:55 imxylz
    http://search.maven.org/#search%7Cga%7C1%7Cjafka
    or
    http://search.maven.org/#search%7Cga%7C1%7Ccom.sohu
    or
    http://repo1.maven.org/maven2/com/sohu/jafka/jafka/1.0/  回復  更多評論
      


    ©2009-2014 IMXYLZ
    主站蜘蛛池模板: 色欲A∨无码蜜臀AV免费播| 免费看美女让人桶尿口| 亚洲精品亚洲人成在线播放| 成人毛片免费网站| 久久久久国色AV免费观看| 久久亚洲中文字幕精品有坂深雪| 成人免费无码大片a毛片软件| 国产精品免费久久久久影院| 久久久久亚洲AV无码专区体验| 午夜精品在线免费观看| 中国videos性高清免费| 色噜噜亚洲男人的天堂| 在线观看亚洲精品福利片| 91情侣在线精品国产免费| 五月天国产成人AV免费观看| 亚洲国产精品综合久久网各| 亚洲国产V高清在线观看| 国产精品免费网站| 本免费AV无码专区一区| 亚洲欧美国产国产一区二区三区| 亚洲成AV人片在线观看无| 国产成人在线观看免费网站| 国产成人精品无码免费看| 美女又黄又免费的视频| 亚洲一级大黄大色毛片| 亚洲AV无码成人精品区天堂| 日韩毛片无码永久免费看| 99国产精品免费观看视频| 国产黄片不卡免费| 亚洲精品第一国产综合亚AV| 亚洲网站免费观看| 亚洲av永久无码精品网站| 亚洲AV无码一区二三区| 成年免费大片黄在线观看岛国| 97人妻精品全国免费视频| 欧洲美女大片免费播放器视频| 亚洲欧洲无卡二区视頻| 亚洲成人高清在线观看| 亚洲欧洲日韩国产综合在线二区| 国产成人亚洲综合无码| 四虎影视在线永久免费看黄|