<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    paulwong

    #

    linkedin高吞吐量分布式消息系統kafka使用手記

    kafka是一種高吞吐量的分布式發布訂閱消息系統,她有如下特性:

    通過O(1)的磁盤數據結構提供消息的持久化,這種結構對于即使數以TB的消息存儲也能夠保持長時間的穩定性能。
    高吞吐量:即使是非常普通的硬件kafka也可以支持每秒數十萬的消息。
    支持通過kafka服務器和消費機集群來分區消息。
    支持Hadoop并行數據加載。

    設計側重高吞吐量,用于好友動態,相關性統計,排行統計,訪問頻率控制,批處理等系統。大部分的消息中間件能夠處理實時性要求高的消息/數據,但是對于隊列中大量未處理的消息/數據在持久性方面比較弱。

    kakfa的consumer使用拉的方式工作。


    安裝kafka
    下載:http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz

    > tar xzf kafka-.tgz
    > cd kafka-
    > ./sbt update
    > ./sbt package
    啟動zkserver:
    bin/zookeeper-server-start.sh config/zookeeper.properties
    啟動server:
    bin/kafka-server-start.sh config/server.properties
    就是這么簡單。


    使用kafka
    import java.util.Arrays;  
    import java.util.List;  
    import java.util.Properties;  
    import kafka.javaapi.producer.SyncProducer;  
    import kafka.javaapi.message.ByteBufferMessageSet;  
    import kafka.message.Message;  
    import kafka.producer.SyncProducerConfig;  
      
      
      
    Properties props = new Properties();  
    props.put(“zk.connect”, “127.0.0.1:2181”);  
    props.put("serializer.class", "kafka.serializer.StringEncoder");  
    ProducerConfig config = new ProducerConfig(props);  
    Producer<String, String> producer = new Producer<String, String>(config);  
      
    Send a single message  
      
    // The message is sent to a randomly selected partition registered in ZK  
    ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-message");  
    producer.send(data);      
      
    producer.close();  


    這樣就是一個標準的producer。

    consumer的代碼
    // specify some consumer properties  
    Properties props = new Properties();  
    props.put("zk.connect", "localhost:2181");  
    props.put("zk.connectiontimeout.ms", "1000000");  
    props.put("groupid", "test_group");  
      
    // Create the connection to the cluster  
    ConsumerConfig consumerConfig = new ConsumerConfig(props);  
    ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);  
      
    // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume  
    Map<String, List<KafkaMessageStream<Message>>> topicMessageStreams =   
        consumerConnector.createMessageStreams(ImmutableMap.of("test", 4));  
    List<KafkaMessageStream<Message>> streams = topicMessageStreams.get("test");  
      
    // create list of 4 threads to consume from each of the partitions   
    ExecutorService executor = Executors.newFixedThreadPool(4);  
      
    // consume the messages in the threads  
    for(final KafkaMessageStream<Message> stream: streams) {  
      executor.submit(new Runnable() {  
        public void run() {  
          for(Message message: stream) {  
            // process message  
          }   
        }  
      });  
    }  





    posted @ 2013-09-08 17:32 paulwong 閱讀(570) | 評論 (0)編輯 收藏

    LOG ANALYST BIG DATA SYSTEM資源

    日志抓取端:

    apache kafka在數據處理中特別是日志和消息的處理上會有很多出色的表現,這里寫個索引,關于kafka的文章暫時就更新到這里,最近利用空閑時間在對kafka做一些功能性增強,并java化,雖然現在已經有很多這樣的版本,但是根據實際需求來改變才是最適合的。

    首先當然推薦的是kafka的官網 http://kafka.apache.org/ 

    在官網最值得參考的文章就是kafka design:http://kafka.apache.org/design.html,我的文章也基本都是參照這里的說明,大家要特別重視這篇文章,里面有好多理念都特別好,推薦多讀幾遍。

    在OSC的翻譯頻道有kafka design全中文的翻譯,翻得挺好的,推薦一下:http://www.oschina.net/translate/kafka-design

    kafka的wiki是很不錯的學習文檔:https://cwiki.apache.org/confluence/display/KAFKA/Index

    ——————————————————————————————————

    接下來就是我寫的一系列文章,文章都是循序漸進的方式帶你了解kafka:

    關于kafka的基本知識,分布式的基礎:《分布式消息系統Kafka初步》

    kafka的分布式搭建,quick start:《kafka分布式環境搭建》

    關于kafka的實現細節,這主要就是講design的部分:《細節上》《細節下》

    關于kafka開發環境,scala環境的搭建:《開發環境搭建》

    數據生產者,producer的用法:《producer的用法》《producer使用注意》

    數據消費者,consumer的用法:《consumer的用法》

    還有些零碎的,關于通信段的源碼解讀:《net包源碼解讀》《broker配置》

    ——————————————————————————————————

    擴展的閱讀還有下面這些:

    我的好友寫的關于kafka和jafka的相關博客,特別好,我有很多問題也都找他解決的,大神一般的存在:http://rockybean.github.com/   @rockybean

    kafka的java化版本jafka:https://github.com/adyliu/jafka

    淘寶的metaQ:https://github.com/killme2008/Metamorphosis

    我最近在寫的inforQ,剛開始寫,我也純粹是為了讀下源碼,不定期更新哈:https://github.com/ielts0909/inforq

    后面一階段可能更新點兒關于cas的東西吧,具體也沒想好,最近一直出差,寫代碼的時間都很少

    --------------------------------------------------------------------------------

    0.8版本的相關更新如下:

    0.8更新內容介紹:《kafka0.8版本的一些更新》

    posted @ 2013-09-08 16:21 paulwong 閱讀(223) | 評論 (0)編輯 收藏

    Install Hadoop in the AWS cloud

    1. get the Whirr tar file
      wget http://www.eu.apache.org/dist/whirr/stable/whirr-0.8.2.tar.gz
    2. untar the Whirr tar file
      tar -vxf whirr-0.8.2.tar.gz
    3. create credentials file
      mkdir ~/.whirr
      cp conf/credentials.sample ~/.whirr/credentials
    4. add the following content to credentials file
      # Set cloud provider connection details
      PROVIDER=aws-ec2
      IDENTITY=<AWS Access Key ID>
      CREDENTIAL=<AWS Secret Access Key>
    5. generate a rsa key pair
      ssh-keygen -t rsa -P ''
    6. create a hadoop.properties file and add the following content
      whirr.cluster-name=whirrhadoopcluster
      whirr.instance-templates=1 hadoop-jobtracker+hadoop-namenode,2 hadoop-datanode+hadoop-tasktracker
      whirr.provider=aws-ec2
      whirr.private-key-file=${sys:user.home}/.ssh/id_rsa
      whirr.public-key-file=${sys:user.home}/.ssh/id_rsa.pub
      whirr.hadoop.version=1.0.2
      whirr.aws-ec2-spot-price=0.08
    7. launch hadoop
      bin/whirr launch-cluster --config hadoop.properties
    8. launch proxy
      cd ~/.whirr/whirrhadoopcluster/
      ./hadoop-proxy.sh
    9. add a rule to iptables
      0.0.0.0/0 50030
      0.0.0.0/0 50070
    10. check the web ui in the browser
      http://<aws-public-dns>:50030
    11. add to /etc/profile
      export HADOOP_CONF_DIR=~/.whirr/whirrhadoopcluster/
    12. check if the hadoop works
      hadoop fs -ls /

















    posted @ 2013-09-08 13:45 paulwong 閱讀(409) | 評論 (0)編輯 收藏

    Redis與Memcached的區別(轉)

     如果簡單地比較Redis與Memcached的區別,大多數都會得到以下觀點:
    1 Redis不僅僅支持簡單的k/v類型的數據,同時還提供list,set,hash等數據結構的存儲。
    2 Redis支持數據的備份,即master-slave模式的數據備份。
    3 Redis支持數據的持久化,可以將內存中的數據保持在磁盤中,重啟的時候可以再次加載進行使用。

    在Redis中,并不是所有的數據都一直存儲在內存中的。這是和Memcached相比一個最大的區別(我個人是這么認為的)。

    Redis只會緩存所有的key的信息,如果Redis發現內存的使用量超過了某一個閥值,將觸發swap的操作,Redis根據“swappability = age*log(size_in_memory)”計算出哪些key對應的value需要swap到磁盤。然后再將這些key對應的value持久化到磁盤中,同時在內存中清除。這種特性使得Redis可以保持超過其機器本身內存大小的數據。當然,機器本身的內存必須要能夠保持所有的key,畢竟這些數據是不會進行swap操作的。

    同時由于Redis將內存中的數據swap到磁盤中的時候,提供服務的主線程和進行swap操作的子線程會共享這部分內存,所以如果更新需要swap的數據,Redis將阻塞這個操作,直到子線程完成swap操作后才可以進行修改。

    可以參考使用Redis特有內存模型前后的情況對比:

    VM off: 300k keys, 4096 bytes values: 1.3G used
    VM on: 300k keys, 4096 bytes values: 73M used
    VM off: 1 million keys, 256 bytes values: 430.12M used
    VM on: 1 million keys, 256 bytes values: 160.09M used
    VM on: 1 million keys, values as large as you want, still: 160.09M used 

    當從Redis中讀取數據的時候,如果讀取的key對應的value不在內存中,那么Redis就需要從swap文件中加載相應數據,然后再返回給請求方。這里就存在一個I/O線程池的問題。在默認的情況下,Redis會出現阻塞,即完成所有的swap文件加載后才會相應。這種策略在客戶端的數量較小,進行批量操作的時候比較合適。但是如果將Redis應用在一個大型的網站應用程序中,這顯然是無法滿足大并發的情況的。所以Redis運行我們設置I/O線程池的大小,對需要從swap文件中加載相應數據的讀取請求進行并發操作,減少阻塞的時間。

    redis、memcache、mongoDB 對比
    從以下幾個維度,對redis、memcache、mongoDB 做了對比,歡迎拍磚

    1、性能
    都比較高,性能對我們來說應該都不是瓶頸
    總體來講,TPS方面redis和memcache差不多,要大于mongodb


    2、操作的便利性
    memcache數據結構單一
    redis豐富一些,數據操作方面,redis更好一些,較少的網絡IO次數
    mongodb支持豐富的數據表達,索引,最類似關系型數據庫,支持的查詢語言非常豐富


    3、內存空間的大小和數據量的大小
    redis在2.0版本后增加了自己的VM特性,突破物理內存的限制;可以對key value設置過期時間(類似memcache)
    memcache可以修改最大可用內存,采用LRU算法
    mongoDB適合大數據量的存儲,依賴操作系統VM做內存管理,吃內存也比較厲害,服務不要和別的服務在一起

    4、可用性(單點問題)

    對于單點問題,
    redis,依賴客戶端來實現分布式讀寫;主從復制時,每次從節點重新連接主節點都要依賴整個快照,無增量復制,因性能和效率問題,
    所以單點問題比較復雜;不支持自動sharding,需要依賴程序設定一致hash 機制。
    一種替代方案是,不用redis本身的復制機制,采用自己做主動復制(多份存儲),或者改成增量復制的方式(需要自己實現),一致性問題和性能的權衡

    Memcache本身沒有數據冗余機制,也沒必要;對于故障預防,采用依賴成熟的hash或者環狀的算法,解決單點故障引起的抖動問題。

    mongoDB支持master-slave,replicaset(內部采用paxos選舉算法,自動故障恢復),auto sharding機制,對客戶端屏蔽了故障轉移和切分機制。


    5、可靠性(持久化)

    對于數據持久化和數據恢復,

    redis支持(快照、AOF):依賴快照進行持久化,aof增強了可靠性的同時,對性能有所影響

    memcache不支持,通常用在做緩存,提升性能;

    MongoDB從1.8版本開始采用binlog方式支持持久化的可靠性


    6、數據一致性(事務支持)

    Memcache 在并發場景下,用cas保證一致性

    redis事務支持比較弱,只能保證事務中的每個操作連續執行

    mongoDB不支持事務


    7、數據分析

    mongoDB內置了數據分析的功能(mapreduce),其他不支持


    8、應用場景
    redis:數據量較小的更性能操作和運算上

    memcache:用于在動態系統中減少數據庫負載,提升性能;做緩存,提高性能(適合讀多寫少,對于數據量比較大,可以采用sharding)

    MongoDB:主要解決海量數據的訪問效率問題

    posted @ 2013-09-06 11:12 paulwong 閱讀(17847) | 評論 (0)編輯 收藏

    MONGODB資源

    R利劍NoSQL系列文章 之 MongoDB
    http://cos.name/2013/04/nosql1-rmongodb/

    MongoDB在盛大大數據量項目中的應用
    http://www.infoq.com/cn/presentations/glj-mongodb-in-sdo


















    posted @ 2013-09-01 13:25 paulwong 閱讀(355) | 評論 (0)編輯 收藏

    REDIS資源

    R利劍NoSQL系列文章 之 MongoDB
    http://cos.name/2013/04/nosql-r-redis/

    REDIS書籍
    http://abcfy2.gitbooks.io/redis-in-action-reading-notes/getting_to_know_redis/session1.html


















    posted @ 2013-09-01 13:21 paulwong 閱讀(362) | 評論 (0)編輯 收藏

    HIVE資源

    Hive是建立在Hadoop上的數據倉庫基礎構架。它提供了一系列的工具,可以用來進行數據提取轉化加載(ETL),這是一種可以存儲、查詢和分析存儲在 Hadoop 中的大規模數據的機制。Hive 定義了簡單的類 SQL 查詢語言,稱為 HQL,它允許熟悉 SQL 的用戶查詢數據。同時,這個語言也允許熟悉 MapReduce 開發者的開發自定義的 mapper 和 reducer 來處理內建的 mapper 和 reducer 無法完成的復雜的分析工作。


    Hive 沒有專門的數據格式。 Hive 可以很好的工作在 Thrift 之上,控制分隔符,也允許用戶指定數據格式


    hive與關系數據庫的區別:

    數據存儲不同:hive基于hadoop的HDFS,關系數據庫則基于本地文件系統

    計算模型不同:hive基于hadoop的mapreduce,關系數據庫則基于索引的內存計算模型

    應用場景不同:hive是OLAP數據倉庫系統提供海量數據查詢的,實時性很差;關系數據庫是OLTP事務系統,為實時查詢業務服務

    擴展性不同:hive基于hadoop很容易通過分布式增加存儲能力和計算能力,關系數據庫水平擴展很難,要不斷增加單機的性能


    Hive安裝及使用攻略
    http://blog.fens.me/hadoop-hive-intro/


    R利劍NoSQL系列文章 之 Hive
    http://cos.name/2013/07/r-nosql-hive/








    posted @ 2013-09-01 12:41 paulwong 閱讀(416) | 評論 (0)編輯 收藏

    分布式搜索資源

    云端分布式搜索技術
    http://www.searchtech.pro


    ELASTICSEARCH中文社區
    http://es-bbs.medcl.net/categories/%E6%9C%80%E6%96%B0%E5%8A%A8%E6%80%81


    http://wangwei3.iteye.com/blog/1818599


    Welcome to the Apache Nutch Wiki
    https://wiki.apache.org/nutch/FrontPage


    elasticsearch客戶端大全
    http://www.searchtech.pro/elasticsearch-clients


    客戶端
    http://es-cn.medcl.net/guide/concepts/scaling-lucene/
    https://github.com/aglover/elasticsearch_article/blob/master/src/main/java/com/b50/usat/load/MusicReviewSearch.java


     

    posted @ 2013-08-31 15:52 paulwong 閱讀(406) | 評論 (0)編輯 收藏

    Install hadoop+hbase+nutch+elasticsearch

         摘要: This document is for Anyela Chavarro. Only these version of each framework work together Code highlighting produced by Actipro CodeHighlighter (freeware) http://www.CodeHighlighter.com/ -->H...  閱讀全文

    posted @ 2013-08-31 01:17 paulwong 閱讀(6308) | 評論 (3)編輯 收藏

    Implementation for CombineFileInputFormat Hadoop 0.20.205

    運行MAPREDUCE JOB時,如果輸入的文件比較小而多時,默認情況下會生成很多的MAP JOB,即一個文件一個MAP JOB,因此需要優化,使多個文件能合成一個MAP JOB的輸入。

    具體的原理是下述三步:

    1.根據輸入目錄下的每個文件,如果其長度超過mapred.max.split.size,以block為單位分成多個split(一個split是一個map的輸入),每個split的長度都大于mapred.max.split.size, 因為以block為單位, 因此也會大于blockSize, 此文件剩下的長度如果大于mapred.min.split.size.per.node, 則生成一個split, 否則先暫時保留.

    2. 現在剩下的都是一些長度效短的碎片,把每個rack下碎片合并, 只要長度超過mapred.max.split.size就合并成一個split, 最后如果剩下的碎片比mapred.min.split.size.per.rack大, 就合并成一個split, 否則暫時保留.

    3. 把不同rack下的碎片合并, 只要長度超過mapred.max.split.size就合并成一個split, 剩下的碎片無論長度, 合并成一個split.
    舉例: mapred.max.split.size=1000
    mapred.min.split.size.per.node=300
    mapred.min.split.size.per.rack=100
    輸入目錄下五個文件,rack1下三個文件,長度為2050,1499,10, rack2下兩個文件,長度為1010,80. 另外blockSize為500.
    經過第一步, 生成五個split: 1000,1000,1000,499,1000. 剩下的碎片為rack1下:50,10; rack2下10:80
    由于兩個rack下的碎片和都不超過100, 所以經過第二步, split和碎片都沒有變化.
    第三步,合并四個碎片成一個split, 長度為150.

    如果要減少map數量, 可以調大mapred.max.split.size, 否則調小即可.

    其特點是: 一個塊至多作為一個map的輸入,一個文件可能有多個塊,一個文件可能因為塊多分給做為不同map的輸入, 一個map可能處理多個塊,可能處理多個文件。

    注:CombineFileInputFormat是一個抽象類,需要編寫一個繼承類。


    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileSplit;
    import org.apache.hadoop.mapred.InputSplit;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.LineRecordReader;
    import org.apache.hadoop.mapred.RecordReader;
    import org.apache.hadoop.mapred.Reporter;
    import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
    import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
    import org.apache.hadoop.mapred.lib.CombineFileSplit;

    @SuppressWarnings("deprecation")
    public class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> {

        @SuppressWarnings({ "unchecked", "rawtypes" })
        @Override
        public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) throws IOException {

            return new CombineFileRecordReader(conf, (CombineFileSplit) split, reporter, (Class) myCombineFileRecordReader.class);
        }

        public static class myCombineFileRecordReader implements RecordReader<LongWritable, Text> {
            private final LineRecordReader linerecord;

            public myCombineFileRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException {
                FileSplit filesplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index), split.getLocations());
                linerecord = new LineRecordReader(conf, filesplit);
            }

            @Override
            public void close() throws IOException {
                linerecord.close();

            }

            @Override
            public LongWritable createKey() {
                // TODO Auto-generated method stub
                return linerecord.createKey();
            }

            @Override
            public Text createValue() {
                // TODO Auto-generated method stub
                return linerecord.createValue();
            }

            @Override
            public long getPos() throws IOException {
                // TODO Auto-generated method stub
                return linerecord.getPos();
            }

            @Override
            public float getProgress() throws IOException {
                // TODO Auto-generated method stub
                return linerecord.getProgress();
            }

            @Override
            public boolean next(LongWritable key, Text value) throws IOException {

                // TODO Auto-generated method stub
                return linerecord.next(key, value);
            }

        }
    }


    在運行時這樣設置:

    if (argument != null) {
                    conf.set("mapred.max.split.size", argument);
                } else {
                    conf.set("mapred.max.split.size", "134217728"); // 128 MB
                }
    //

                conf.setInputFormat(CombinedInputFormat.class);


    posted @ 2013-08-29 16:08 paulwong 閱讀(385) | 評論 (0)編輯 收藏

    僅列出標題
    共115頁: First 上一頁 61 62 63 64 65 66 67 68 69 下一頁 Last 
    主站蜘蛛池模板: 夜夜亚洲天天久久| 一级做a爰片久久毛片免费看| 亚洲一卡2卡3卡4卡5卡6卡| 色吊丝最新永久免费观看网站| 一级做α爱过程免费视频| 久久亚洲AV成人无码电影| 免费看AV毛片一区二区三区| 中文永久免费观看网站| 亚洲天堂福利视频| 亚洲精品综合久久| 四虎国产精品免费久久| a级毛片毛片免费观看久潮| 亚洲综合无码一区二区痴汉| 久久91亚洲人成电影网站| 日韩中文字幕免费| 免费无遮挡无码永久视频| 老司机午夜免费视频| 亚洲大片免费观看| 国产成人A亚洲精V品无码| 精品剧情v国产在免费线观看| 99爱免费观看视频在线| 一级成人a免费视频| 久久精品国产99国产精品亚洲| 中文国产成人精品久久亚洲精品AⅤ无码精品| 国产精品入口麻豆免费观看| 中文字幕乱理片免费完整的| 亚洲精品无码专区在线| 精品日韩亚洲AV无码| 亚洲综合AV在线在线播放| 国产午夜免费秋霞影院| ww在线观视频免费观看| a国产成人免费视频| 亚洲黄网在线观看| 亚洲精品无码专区在线在线播放| 免费无码看av的网站| 青青草a免费线观a| 色欲国产麻豆一精品一AV一免费 | 亚洲精品免费视频| 久久久久久久亚洲精品| 四虎国产精品免费视| 久久不见久久见免费影院|