#
kafka是一種高吞吐量的分布式發布訂閱消息系統,她有如下特性:
通過O(1)的磁盤數據結構提供消息的持久化,這種結構對于即使數以TB的消息存儲也能夠保持長時間的穩定性能。
高吞吐量:即使是非常普通的硬件kafka也可以支持每秒數十萬的消息。
支持通過kafka服務器和消費機集群來分區消息。
支持Hadoop并行數據加載。
設計側重高吞吐量,用于好友動態,相關性統計,排行統計,訪問頻率控制,批處理等系統。大部分的消息中間件能夠處理實時性要求高的消息/數據,但是對于隊列中大量未處理的消息/數據在持久性方面比較弱。
kakfa的consumer使用拉的方式工作。
安裝kafka下載:http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz
> tar xzf kafka-.tgz
> cd kafka-
> ./sbt update
> ./sbt package
啟動zkserver:
bin/zookeeper-server-start.sh config/zookeeper.properties
啟動server:
bin/kafka-server-start.sh config/server.properties
就是這么簡單。
使用kafka
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import kafka.javaapi.producer.SyncProducer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.producer.SyncProducerConfig;
Properties props =
new Properties();
props.put(“zk.connect”, “127.0.0.1:2181”);
props.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig config =
new ProducerConfig(props);
Producer<String, String> producer =
new Producer<String, String>(config);
Send a single message
// The message is sent to a randomly selected partition registered in ZK
ProducerData<String, String> data =
new ProducerData<String, String>("test-topic", "test-message");
producer.send(data);
producer.close();
這樣就是一個標準的producer。
consumer的代碼
// specify some consumer properties
Properties props = new Properties();
props.put("zk.connect", "localhost:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "test_group");
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
Map<String, List<KafkaMessageStream<Message>>> topicMessageStreams =
consumerConnector.createMessageStreams(ImmutableMap.of("test", 4));
List<KafkaMessageStream<Message>> streams = topicMessageStreams.get("test");
// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(4);
// consume the messages in the threads
for(final KafkaMessageStream<Message> stream: streams) {
executor.submit(new Runnable() {
public void run() {
for(Message message: stream) {
// process message
}
}
});
}
日志抓取端:
apache kafka在數據處理中特別是日志和消息的處理上會有很多出色的表現,這里寫個索引,關于kafka的文章暫時就更新到這里,最近利用空閑時間在對kafka做一些功能性增強,并java化,雖然現在已經有很多這樣的版本,但是根據實際需求來改變才是最適合的。
首先當然推薦的是kafka的官網 http://kafka.apache.org/
在官網最值得參考的文章就是kafka design:http://kafka.apache.org/design.html,我的文章也基本都是參照這里的說明,大家要特別重視這篇文章,里面有好多理念都特別好,推薦多讀幾遍。
在OSC的翻譯頻道有kafka design全中文的翻譯,翻得挺好的,推薦一下:http://www.oschina.net/translate/kafka-design
kafka的wiki是很不錯的學習文檔:https://cwiki.apache.org/confluence/display/KAFKA/Index
——————————————————————————————————
接下來就是我寫的一系列文章,文章都是循序漸進的方式帶你了解kafka:
關于kafka的基本知識,分布式的基礎:《分布式消息系統Kafka初步》
kafka的分布式搭建,quick start:《kafka分布式環境搭建》
關于kafka的實現細節,這主要就是講design的部分:《細節上》、《細節下》
關于kafka開發環境,scala環境的搭建:《開發環境搭建》
數據生產者,producer的用法:《producer的用法》、《producer使用注意》
數據消費者,consumer的用法:《consumer的用法》
還有些零碎的,關于通信段的源碼解讀:《net包源碼解讀》、《broker配置》
——————————————————————————————————
擴展的閱讀還有下面這些:
我的好友寫的關于kafka和jafka的相關博客,特別好,我有很多問題也都找他解決的,大神一般的存在:http://rockybean.github.com/ @rockybean
kafka的java化版本jafka:https://github.com/adyliu/jafka
淘寶的metaQ:https://github.com/killme2008/Metamorphosis
我最近在寫的inforQ,剛開始寫,我也純粹是為了讀下源碼,不定期更新哈:https://github.com/ielts0909/inforq
后面一階段可能更新點兒關于cas的東西吧,具體也沒想好,最近一直出差,寫代碼的時間都很少
--------------------------------------------------------------------------------
0.8版本的相關更新如下:
0.8更新內容介紹:《kafka0.8版本的一些更新》
- get the Whirr tar file
wget http://www.eu.apache.org/dist/whirr/stable/whirr-0.8.2.tar.gz
- untar the Whirr tar file
tar -vxf whirr-0.8.2.tar.gz
- create credentials file
mkdir ~/.whirr
cp conf/credentials.sample ~/.whirr/credentials
- add the following content to credentials file
# Set cloud provider connection details
PROVIDER=aws-ec2
IDENTITY=<AWS Access Key ID>
CREDENTIAL=<AWS Secret Access Key>
generate a rsa key pair
ssh-keygen -t rsa -P ''
- create a hadoop.properties file and add the following content
whirr.cluster-name=whirrhadoopcluster
whirr.instance-templates=1 hadoop-jobtracker+hadoop-namenode,2 hadoop-datanode+hadoop-tasktracker
whirr.provider=aws-ec2
whirr.private-key-file=${sys:user.home}/.ssh/id_rsa
whirr.public-key-file=${sys:user.home}/.ssh/id_rsa.pub
whirr.hadoop.version=1.0.2
whirr.aws-ec2-spot-price=0.08
- launch hadoop
bin/whirr launch-cluster --config hadoop.properties
- launch proxy
cd ~/.whirr/whirrhadoopcluster/
./hadoop-proxy.sh
- add a rule to iptables
0.0.0.0/0 50030
0.0.0.0/0 50070
- check the web ui in the browser
http://<aws-public-dns>:50030
- add to /etc/profile
export HADOOP_CONF_DIR=~/.whirr/whirrhadoopcluster/
- check if the hadoop works
hadoop fs -ls /
如果簡單地比較Redis與Memcached的區別,大多數都會得到以下觀點:
1 Redis不僅僅支持簡單的k/v類型的數據,同時還提供list,set,hash等數據結構的存儲。
2 Redis支持數據的備份,即master-slave模式的數據備份。
3 Redis支持數據的持久化,可以將內存中的數據保持在磁盤中,重啟的時候可以再次加載進行使用。
在Redis中,并不是所有的數據都一直存儲在內存中的。這是和Memcached相比一個最大的區別(我個人是這么認為的)。
Redis只會緩存所有的key的信息,如果Redis發現內存的使用量超過了某一個閥值,將觸發swap的操作,Redis根據“swappability = age*log(size_in_memory)”計算出哪些key對應的value需要swap到磁盤。然后再將這些key對應的value持久化到磁盤中,同時在內存中清除。這種特性使得Redis可以保持超過其機器本身內存大小的數據。當然,機器本身的內存必須要能夠保持所有的key,畢竟這些數據是不會進行swap操作的。
同時由于Redis將內存中的數據swap到磁盤中的時候,提供服務的主線程和進行swap操作的子線程會共享這部分內存,所以如果更新需要swap的數據,Redis將阻塞這個操作,直到子線程完成swap操作后才可以進行修改。
可以參考使用Redis特有內存模型前后的情況對比:
VM off: 300k keys, 4096 bytes values: 1.3G used
VM on: 300k keys, 4096 bytes values: 73M used
VM off: 1 million keys, 256 bytes values: 430.12M used
VM on: 1 million keys, 256 bytes values: 160.09M used
VM on: 1 million keys, values as large as you want, still: 160.09M used
當從Redis中讀取數據的時候,如果讀取的key對應的value不在內存中,那么Redis就需要從swap文件中加載相應數據,然后再返回給請求方。這里就存在一個I/O線程池的問題。在默認的情況下,Redis會出現阻塞,即完成所有的swap文件加載后才會相應。這種策略在客戶端的數量較小,進行批量操作的時候比較合適。但是如果將Redis應用在一個大型的網站應用程序中,這顯然是無法滿足大并發的情況的。所以Redis運行我們設置I/O線程池的大小,對需要從swap文件中加載相應數據的讀取請求進行并發操作,減少阻塞的時間。
redis、memcache、mongoDB 對比
從以下幾個維度,對redis、memcache、mongoDB 做了對比,歡迎拍磚
1、性能
都比較高,性能對我們來說應該都不是瓶頸
總體來講,TPS方面redis和memcache差不多,要大于mongodb
2、操作的便利性
memcache數據結構單一
redis豐富一些,數據操作方面,redis更好一些,較少的網絡IO次數
mongodb支持豐富的數據表達,索引,最類似關系型數據庫,支持的查詢語言非常豐富
3、內存空間的大小和數據量的大小
redis在2.0版本后增加了自己的VM特性,突破物理內存的限制;可以對key value設置過期時間(類似memcache)
memcache可以修改最大可用內存,采用LRU算法
mongoDB適合大數據量的存儲,依賴操作系統VM做內存管理,吃內存也比較厲害,服務不要和別的服務在一起
4、可用性(單點問題)
對于單點問題,
redis,依賴客戶端來實現分布式讀寫;主從復制時,每次從節點重新連接主節點都要依賴整個快照,無增量復制,因性能和效率問題,
所以單點問題比較復雜;不支持自動sharding,需要依賴程序設定一致hash 機制。
一種替代方案是,不用redis本身的復制機制,采用自己做主動復制(多份存儲),或者改成增量復制的方式(需要自己實現),一致性問題和性能的權衡
Memcache本身沒有數據冗余機制,也沒必要;對于故障預防,采用依賴成熟的hash或者環狀的算法,解決單點故障引起的抖動問題。
mongoDB支持master-slave,replicaset(內部采用paxos選舉算法,自動故障恢復),auto sharding機制,對客戶端屏蔽了故障轉移和切分機制。
5、可靠性(持久化)
對于數據持久化和數據恢復,
redis支持(快照、AOF):依賴快照進行持久化,aof增強了可靠性的同時,對性能有所影響
memcache不支持,通常用在做緩存,提升性能;
MongoDB從1.8版本開始采用binlog方式支持持久化的可靠性
6、數據一致性(事務支持)
Memcache 在并發場景下,用cas保證一致性
redis事務支持比較弱,只能保證事務中的每個操作連續執行
mongoDB不支持事務
7、數據分析
mongoDB內置了數據分析的功能(mapreduce),其他不支持
8、應用場景
redis:數據量較小的更性能操作和運算上
memcache:用于在動態系統中減少數據庫負載,提升性能;做緩存,提高性能(適合讀多寫少,對于數據量比較大,可以采用sharding)
MongoDB:主要解決海量數據的訪問效率問題
Hive是建立在Hadoop上的數據倉庫基礎構架。它提供了一系列的工具,可以用來進行數據提取轉化加載(ETL),這是一種可以存儲、查詢和分析存儲在 Hadoop 中的大規模數據的機制。Hive 定義了簡單的類 SQL 查詢語言,稱為 HQL,它允許熟悉 SQL 的用戶查詢數據。同時,這個語言也允許熟悉 MapReduce 開發者的開發自定義的 mapper 和 reducer 來處理內建的 mapper 和 reducer 無法完成的復雜的分析工作。
Hive 沒有專門的數據格式。 Hive 可以很好的工作在 Thrift 之上,控制分隔符,也允許用戶指定數據格式
hive與關系數據庫的區別:
數據存儲不同:hive基于hadoop的HDFS,關系數據庫則基于本地文件系統
計算模型不同:hive基于hadoop的mapreduce,關系數據庫則基于索引的內存計算模型
應用場景不同:hive是OLAP數據倉庫系統提供海量數據查詢的,實時性很差;關系數據庫是OLTP事務系統,為實時查詢業務服務
擴展性不同:hive基于hadoop很容易通過分布式增加存儲能力和計算能力,關系數據庫水平擴展很難,要不斷增加單機的性能
Hive安裝及使用攻略
http://blog.fens.me/hadoop-hive-intro/
R利劍NoSQL系列文章 之 Hive
http://cos.name/2013/07/r-nosql-hive/
摘要: This document is for Anyela Chavarro.
Only these version of each framework work together
Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/
-->H...
閱讀全文
運行MAPREDUCE JOB時,如果輸入的文件比較小而多時,默認情況下會生成很多的MAP JOB,即一個文件一個MAP JOB,因此需要優化,使多個文件能合成一個MAP JOB的輸入。
具體的原理是下述三步:
1.根據輸入目錄下的每個文件,如果其長度超過mapred.max.split.size,以block為單位分成多個split(一個split是一個map的輸入),每個split的長度都大于mapred.max.split.size, 因為以block為單位, 因此也會大于blockSize, 此文件剩下的長度如果大于mapred.min.split.size.per.node, 則生成一個split, 否則先暫時保留.
2. 現在剩下的都是一些長度效短的碎片,把每個rack下碎片合并, 只要長度超過mapred.max.split.size就合并成一個split, 最后如果剩下的碎片比mapred.min.split.size.per.rack大, 就合并成一個split, 否則暫時保留.
3. 把不同rack下的碎片合并, 只要長度超過mapred.max.split.size就合并成一個split, 剩下的碎片無論長度, 合并成一個split.
舉例: mapred.max.split.size=1000
mapred.min.split.size.per.node=300
mapred.min.split.size.per.rack=100
輸入目錄下五個文件,rack1下三個文件,長度為2050,1499,10, rack2下兩個文件,長度為1010,80. 另外blockSize為500.
經過第一步, 生成五個split: 1000,1000,1000,499,1000. 剩下的碎片為rack1下:50,10; rack2下10:80
由于兩個rack下的碎片和都不超過100, 所以經過第二步, split和碎片都沒有變化.
第三步,合并四個碎片成一個split, 長度為150.
如果要減少map數量, 可以調大mapred.max.split.size, 否則調小即可.
其特點是: 一個塊至多作為一個map的輸入,一個文件可能有多個塊,一個文件可能因為塊多分給做為不同map的輸入, 一個map可能處理多個塊,可能處理多個文件。
注:CombineFileInputFormat是一個抽象類,需要編寫一個繼承類。
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;
@SuppressWarnings("deprecation")
public class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) throws IOException {
return new CombineFileRecordReader(conf, (CombineFileSplit) split, reporter, (Class) myCombineFileRecordReader.class);
}
public static class myCombineFileRecordReader implements RecordReader<LongWritable, Text> {
private final LineRecordReader linerecord;
public myCombineFileRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException {
FileSplit filesplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index), split.getLocations());
linerecord = new LineRecordReader(conf, filesplit);
}
@Override
public void close() throws IOException {
linerecord.close();
}
@Override
public LongWritable createKey() {
// TODO Auto-generated method stub
return linerecord.createKey();
}
@Override
public Text createValue() {
// TODO Auto-generated method stub
return linerecord.createValue();
}
@Override
public long getPos() throws IOException {
// TODO Auto-generated method stub
return linerecord.getPos();
}
@Override
public float getProgress() throws IOException {
// TODO Auto-generated method stub
return linerecord.getProgress();
}
@Override
public boolean next(LongWritable key, Text value) throws IOException {
// TODO Auto-generated method stub
return linerecord.next(key, value);
}
}
}
在運行時這樣設置:
if (argument !=
null) {
conf.set("mapred.max.split.size", argument);
}
else {
conf.set("mapred.max.split.size", "134217728");
// 128 MB
}
//
conf.setInputFormat(CombinedInputFormat.
class);