對(duì)于一堆時(shí)刻在增長的數(shù)據(jù),如果要統(tǒng)計(jì),可以采取什么方法呢?
- 等數(shù)據(jù)增長到一定程度的時(shí)候,跑一個(gè)統(tǒng)計(jì)程序進(jìn)行統(tǒng)計(jì)。適用于實(shí)時(shí)性要求不高的場(chǎng)景。
如將數(shù)據(jù)導(dǎo)到HDFS,再運(yùn)行一個(gè)MAP REDUCE JOB。
- 如果實(shí)時(shí)性要求高的,上面的方法就不行了。因此就帶來第二種方法。
在數(shù)據(jù)每次增長一筆的時(shí)候,就進(jìn)行統(tǒng)計(jì)JOB,結(jié)果放到DB或搜索引擎的INDEX中。
STORM就是完成這種工作的。
HADOOP與STORM比較
- 數(shù)據(jù)來源:HADOOP是HDFS上某個(gè)文件夾下的可能是成TB的數(shù)據(jù),STORM是實(shí)時(shí)新增的某一筆數(shù)據(jù)
- 處理過程:HADOOP是分MAP階段到REDUCE階段,STORM是由用戶定義處理流程,
流程中可以包含多個(gè)步驟,每個(gè)步驟可以是數(shù)據(jù)源(SPOUT)或處理邏輯(BOLT) - 是否結(jié)束:HADOOP最后是要結(jié)束的,STORM是沒有結(jié)束狀態(tài),到最后一步時(shí),就停在那,直到有新
數(shù)據(jù)進(jìn)入時(shí)再從頭開始 - 處理速度:HADOOP是以處理HDFS上大量數(shù)據(jù)為目的,速度慢,STORM是只要處理新增的某一筆數(shù)據(jù)即可
可以做到很快。 - 適用場(chǎng)景:HADOOP是在要處理一批數(shù)據(jù)時(shí)用的,不講究時(shí)效性,要處理就提交一個(gè)JOB,STORM是要處理
某一新增數(shù)據(jù)時(shí)用的,要講時(shí)效性
- 與MQ對(duì)比:HADOOP沒有對(duì)比性,STORM可以看作是有N個(gè)步驟,每個(gè)步驟處理完就向下一個(gè)MQ發(fā)送消息,
監(jiān)聽這個(gè)MQ的消費(fèi)者繼續(xù)處理
首先要明白Storm和Hadoop的應(yīng)用領(lǐng)域,注意加粗、標(biāo)紅的關(guān)鍵字。
Hadoop是基于Map/Reduce模型的,處理海量數(shù)據(jù)的離線分析工具。
Storm是分布式的、實(shí)時(shí)數(shù)據(jù)流分析工具,數(shù)據(jù)是源源不斷產(chǎn)生的,例如Twitter的Timeline。
再回到你說的速度問題,只能說Storm更適用于實(shí)時(shí)數(shù)據(jù)流,Map/Reduce模型在實(shí)時(shí)領(lǐng)域很難有所發(fā)揮,不能簡(jiǎn)單粗暴的說誰快誰慢。
這里的快主要是指的時(shí)延。
storm的網(wǎng)絡(luò)直傳、內(nèi)存計(jì)算,其時(shí)延必然比hadoop的通過hdfs傳輸?shù)偷枚啵划?dāng)計(jì)算模型比較適合流式時(shí),storm的流式處理,省去了批處理的收集數(shù)據(jù)的時(shí)間;因?yàn)閟torm是服務(wù)型的作業(yè),也省去了作業(yè)調(diào)度的時(shí)延。所以從時(shí)延上來看,storm要快于hadoop。
說一個(gè)典型的場(chǎng)景,幾千個(gè)日志生產(chǎn)方產(chǎn)生日志文件,需要進(jìn)行一些ETL操作存入一個(gè)數(shù)據(jù)庫。
假設(shè)利用hadoop,則需要先存入hdfs,按每一分鐘切一個(gè)文件的粒度來算(這個(gè)粒度已經(jīng)極端的細(xì)了,再小的話hdfs上會(huì)一堆小文件),hadoop開始計(jì)算時(shí),1分鐘已經(jīng)過去了,然后再開始調(diào)度任務(wù)又花了一分鐘,然后作業(yè)運(yùn)行起來,假設(shè)機(jī)器特別多,幾鈔鐘就算完了,然后寫數(shù)據(jù)庫假設(shè)也花了很少的時(shí)間,這樣,從數(shù)據(jù)產(chǎn)生到最后可以使用已經(jīng)過去了至少兩分多鐘。
而流式計(jì)算則是數(shù)據(jù)產(chǎn)生時(shí),則有一個(gè)程序去一直監(jiān)控日志的產(chǎn)生,產(chǎn)生一行就通過一個(gè)傳輸系統(tǒng)發(fā)給流式計(jì)算系統(tǒng),然后流式計(jì)算系統(tǒng)直接處理,處理完之后直接寫入數(shù)據(jù)庫,每條數(shù)據(jù)從產(chǎn)生到寫入數(shù)據(jù)庫,在資源充足時(shí)可以在毫秒級(jí)別完成。
當(dāng)然,跑一個(gè)大文件的wordcount,本來就是一個(gè)批處理計(jì)算的模型,你非要把它放到storm上進(jìn)行流式的處理,然后又非要讓等所有已有數(shù)據(jù)處理完才讓storm輸出結(jié)果,這時(shí)候,你再把它和hadoop比較快慢,這時(shí),其實(shí)比較的不是時(shí)延,而是比較的吞吐了。
Hadoop M/R基于HDFS,需要切分輸入數(shù)據(jù)、產(chǎn)生中間數(shù)據(jù)文件、排序、數(shù)據(jù)壓縮、多份復(fù)制等,效率較低。
Storm 基于ZeroMQ這個(gè)高性能的消息通訊庫,不持久化數(shù)據(jù)。
kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),她有如下特性:
通過O(1)的磁盤數(shù)據(jù)結(jié)構(gòu)提供消息的持久化,這種結(jié)構(gòu)對(duì)于即使數(shù)以TB的消息存儲(chǔ)也能夠保持長時(shí)間的穩(wěn)定性能。
高吞吐量:即使是非常普通的硬件kafka也可以支持每秒數(shù)十萬的消息。
支持通過kafka服務(wù)器和消費(fèi)機(jī)集群來分區(qū)消息。
支持Hadoop并行數(shù)據(jù)加載。
設(shè)計(jì)側(cè)重高吞吐量,用于好友動(dòng)態(tài),相關(guān)性統(tǒng)計(jì),排行統(tǒng)計(jì),訪問頻率控制,批處理等系統(tǒng)。大部分的消息中間件能夠處理實(shí)時(shí)性要求高的消息/數(shù)據(jù),但是對(duì)于隊(duì)列中大量未處理的消息/數(shù)據(jù)在持久性方面比較弱。
kakfa的consumer使用拉的方式工作。
安裝kafka下載:http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz
> tar xzf kafka-.tgz
> cd kafka-
> ./sbt update
> ./sbt package
啟動(dòng)zkserver:
bin/zookeeper-server-start.sh config/zookeeper.properties
啟動(dòng)server:
bin/kafka-server-start.sh config/server.properties
就是這么簡(jiǎn)單。
使用kafka
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import kafka.javaapi.producer.SyncProducer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.producer.SyncProducerConfig;
Properties props =
new Properties();
props.put(“zk.connect”, “127.0.0.1:2181”);
props.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig config =
new ProducerConfig(props);
Producer<String, String> producer =
new Producer<String, String>(config);
Send a single message
// The message is sent to a randomly selected partition registered in ZK
ProducerData<String, String> data =
new ProducerData<String, String>("test-topic", "test-message");
producer.send(data);
producer.close();
這樣就是一個(gè)標(biāo)準(zhǔn)的producer。
consumer的代碼
// specify some consumer properties
Properties props = new Properties();
props.put("zk.connect", "localhost:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "test_group");
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
Map<String, List<KafkaMessageStream<Message>>> topicMessageStreams =
consumerConnector.createMessageStreams(ImmutableMap.of("test", 4));
List<KafkaMessageStream<Message>> streams = topicMessageStreams.get("test");
// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(4);
// consume the messages in the threads
for(final KafkaMessageStream<Message> stream: streams) {
executor.submit(new Runnable() {
public void run() {
for(Message message: stream) {
// process message
}
}
});
}
日志抓取端:
apache kafka在數(shù)據(jù)處理中特別是日志和消息的處理上會(huì)有很多出色的表現(xiàn),這里寫個(gè)索引,關(guān)于kafka的文章暫時(shí)就更新到這里,最近利用空閑時(shí)間在對(duì)kafka做一些功能性增強(qiáng),并java化,雖然現(xiàn)在已經(jīng)有很多這樣的版本,但是根據(jù)實(shí)際需求來改變才是最適合的。
首先當(dāng)然推薦的是kafka的官網(wǎng) http://kafka.apache.org/
在官網(wǎng)最值得參考的文章就是kafka design:http://kafka.apache.org/design.html,我的文章也基本都是參照這里的說明,大家要特別重視這篇文章,里面有好多理念都特別好,推薦多讀幾遍。
在OSC的翻譯頻道有kafka design全中文的翻譯,翻得挺好的,推薦一下:http://www.oschina.net/translate/kafka-design
kafka的wiki是很不錯(cuò)的學(xué)習(xí)文檔:https://cwiki.apache.org/confluence/display/KAFKA/Index
——————————————————————————————————
接下來就是我寫的一系列文章,文章都是循序漸進(jìn)的方式帶你了解kafka:
關(guān)于kafka的基本知識(shí),分布式的基礎(chǔ):《分布式消息系統(tǒng)Kafka初步》
kafka的分布式搭建,quick start:《kafka分布式環(huán)境搭建》
關(guān)于kafka的實(shí)現(xiàn)細(xì)節(jié),這主要就是講design的部分:《細(xì)節(jié)上》、《細(xì)節(jié)下》
關(guān)于kafka開發(fā)環(huán)境,scala環(huán)境的搭建:《開發(fā)環(huán)境搭建》
數(shù)據(jù)生產(chǎn)者,producer的用法:《producer的用法》、《producer使用注意》
數(shù)據(jù)消費(fèi)者,consumer的用法:《consumer的用法》
還有些零碎的,關(guān)于通信段的源碼解讀:《net包源碼解讀》、《broker配置》
——————————————————————————————————
擴(kuò)展的閱讀還有下面這些:
我的好友寫的關(guān)于kafka和jafka的相關(guān)博客,特別好,我有很多問題也都找他解決的,大神一般的存在:http://rockybean.github.com/ @rockybean
kafka的java化版本jafka:https://github.com/adyliu/jafka
淘寶的metaQ:https://github.com/killme2008/Metamorphosis
我最近在寫的inforQ,剛開始寫,我也純粹是為了讀下源碼,不定期更新哈:https://github.com/ielts0909/inforq
后面一階段可能更新點(diǎn)兒關(guān)于cas的東西吧,具體也沒想好,最近一直出差,寫代碼的時(shí)間都很少
--------------------------------------------------------------------------------
0.8版本的相關(guān)更新如下:
0.8更新內(nèi)容介紹:《kafka0.8版本的一些更新》
如果簡(jiǎn)單地比較Redis與Memcached的區(qū)別,大多數(shù)都會(huì)得到以下觀點(diǎn):
1 Redis不僅僅支持簡(jiǎn)單的k/v類型的數(shù)據(jù),同時(shí)還提供list,set,hash等數(shù)據(jù)結(jié)構(gòu)的存儲(chǔ)。
2 Redis支持?jǐn)?shù)據(jù)的備份,即master-slave模式的數(shù)據(jù)備份。
3 Redis支持?jǐn)?shù)據(jù)的持久化,可以將內(nèi)存中的數(shù)據(jù)保持在磁盤中,重啟的時(shí)候可以再次加載進(jìn)行使用。
在Redis中,并不是所有的數(shù)據(jù)都一直存儲(chǔ)在內(nèi)存中的。這是和Memcached相比一個(gè)最大的區(qū)別(我個(gè)人是這么認(rèn)為的)。
Redis只會(huì)緩存所有的key的信息,如果Redis發(fā)現(xiàn)內(nèi)存的使用量超過了某一個(gè)閥值,將觸發(fā)swap的操作,Redis根據(jù)“swappability = age*log(size_in_memory)”計(jì)算出哪些key對(duì)應(yīng)的value需要swap到磁盤。然后再將這些key對(duì)應(yīng)的value持久化到磁盤中,同時(shí)在內(nèi)存中清除。這種特性使得Redis可以保持超過其機(jī)器本身內(nèi)存大小的數(shù)據(jù)。當(dāng)然,機(jī)器本身的內(nèi)存必須要能夠保持所有的key,畢竟這些數(shù)據(jù)是不會(huì)進(jìn)行swap操作的。
同時(shí)由于Redis將內(nèi)存中的數(shù)據(jù)swap到磁盤中的時(shí)候,提供服務(wù)的主線程和進(jìn)行swap操作的子線程會(huì)共享這部分內(nèi)存,所以如果更新需要swap的數(shù)據(jù),Redis將阻塞這個(gè)操作,直到子線程完成swap操作后才可以進(jìn)行修改。
可以參考使用Redis特有內(nèi)存模型前后的情況對(duì)比:
VM off: 300k keys, 4096 bytes values: 1.3G used
VM on: 300k keys, 4096 bytes values: 73M used
VM off: 1 million keys, 256 bytes values: 430.12M used
VM on: 1 million keys, 256 bytes values: 160.09M used
VM on: 1 million keys, values as large as you want, still: 160.09M used
當(dāng)從Redis中讀取數(shù)據(jù)的時(shí)候,如果讀取的key對(duì)應(yīng)的value不在內(nèi)存中,那么Redis就需要從swap文件中加載相應(yīng)數(shù)據(jù),然后再返回給請(qǐng)求方。這里就存在一個(gè)I/O線程池的問題。在默認(rèn)的情況下,Redis會(huì)出現(xiàn)阻塞,即完成所有的swap文件加載后才會(huì)相應(yīng)。這種策略在客戶端的數(shù)量較小,進(jìn)行批量操作的時(shí)候比較合適。但是如果將Redis應(yīng)用在一個(gè)大型的網(wǎng)站應(yīng)用程序中,這顯然是無法滿足大并發(fā)的情況的。所以Redis運(yùn)行我們?cè)O(shè)置I/O線程池的大小,對(duì)需要從swap文件中加載相應(yīng)數(shù)據(jù)的讀取請(qǐng)求進(jìn)行并發(fā)操作,減少阻塞的時(shí)間。
redis、memcache、mongoDB 對(duì)比
從以下幾個(gè)維度,對(duì)redis、memcache、mongoDB 做了對(duì)比,歡迎拍磚
1、性能
都比較高,性能對(duì)我們來說應(yīng)該都不是瓶頸
總體來講,TPS方面redis和memcache差不多,要大于mongodb
2、操作的便利性
memcache數(shù)據(jù)結(jié)構(gòu)單一
redis豐富一些,數(shù)據(jù)操作方面,redis更好一些,較少的網(wǎng)絡(luò)IO次數(shù)
mongodb支持豐富的數(shù)據(jù)表達(dá),索引,最類似關(guān)系型數(shù)據(jù)庫,支持的查詢語言非常豐富
3、內(nèi)存空間的大小和數(shù)據(jù)量的大小
redis在2.0版本后增加了自己的VM特性,突破物理內(nèi)存的限制;可以對(duì)key value設(shè)置過期時(shí)間(類似memcache)
memcache可以修改最大可用內(nèi)存,采用LRU算法
mongoDB適合大數(shù)據(jù)量的存儲(chǔ),依賴操作系統(tǒng)VM做內(nèi)存管理,吃內(nèi)存也比較厲害,服務(wù)不要和別的服務(wù)在一起
4、可用性(單點(diǎn)問題)
對(duì)于單點(diǎn)問題,
redis,依賴客戶端來實(shí)現(xiàn)分布式讀寫;主從復(fù)制時(shí),每次從節(jié)點(diǎn)重新連接主節(jié)點(diǎn)都要依賴整個(gè)快照,無增量復(fù)制,因性能和效率問題,
所以單點(diǎn)問題比較復(fù)雜;不支持自動(dòng)sharding,需要依賴程序設(shè)定一致hash 機(jī)制。
一種替代方案是,不用redis本身的復(fù)制機(jī)制,采用自己做主動(dòng)復(fù)制(多份存儲(chǔ)),或者改成增量復(fù)制的方式(需要自己實(shí)現(xiàn)),一致性問題和性能的權(quán)衡
Memcache本身沒有數(shù)據(jù)冗余機(jī)制,也沒必要;對(duì)于故障預(yù)防,采用依賴成熟的hash或者環(huán)狀的算法,解決單點(diǎn)故障引起的抖動(dòng)問題。
mongoDB支持master-slave,replicaset(內(nèi)部采用paxos選舉算法,自動(dòng)故障恢復(fù)),auto sharding機(jī)制,對(duì)客戶端屏蔽了故障轉(zhuǎn)移和切分機(jī)制。
5、可靠性(持久化)
對(duì)于數(shù)據(jù)持久化和數(shù)據(jù)恢復(fù),
redis支持(快照、AOF):依賴快照進(jìn)行持久化,aof增強(qiáng)了可靠性的同時(shí),對(duì)性能有所影響
memcache不支持,通常用在做緩存,提升性能;
MongoDB從1.8版本開始采用binlog方式支持持久化的可靠性
6、數(shù)據(jù)一致性(事務(wù)支持)
Memcache 在并發(fā)場(chǎng)景下,用cas保證一致性
redis事務(wù)支持比較弱,只能保證事務(wù)中的每個(gè)操作連續(xù)執(zhí)行
mongoDB不支持事務(wù)
7、數(shù)據(jù)分析
mongoDB內(nèi)置了數(shù)據(jù)分析的功能(mapreduce),其他不支持
8、應(yīng)用場(chǎng)景
redis:數(shù)據(jù)量較小的更性能操作和運(yùn)算上
memcache:用于在動(dòng)態(tài)系統(tǒng)中減少數(shù)據(jù)庫負(fù)載,提升性能;做緩存,提高性能(適合讀多寫少,對(duì)于數(shù)據(jù)量比較大,可以采用sharding)
MongoDB:主要解決海量數(shù)據(jù)的訪問效率問題