王 騰騰 和 邵 兵
2015 年 11 月 26 日發(fā)布
WeiboGoogle+用電子郵件發(fā)送本頁面
Comments 1
引子
隨著云時(shí)代的來臨,大數(shù)據(jù)(Big data)也獲得了越來越多的關(guān)注。著云臺的分析師團(tuán)隊(duì)認(rèn)為,大數(shù)據(jù)(Big data)通常用來形容一個(gè)公司創(chuàng)造的大量非結(jié)構(gòu)化和半結(jié)構(gòu)化數(shù)據(jù),這些數(shù)據(jù)在下載到關(guān)系型數(shù)據(jù)庫用于分析時(shí)會花費(fèi)過多時(shí)間和金錢。大數(shù)據(jù)分析常和云計(jì)算聯(lián)系到一起,因?yàn)閷?shí)時(shí)的大型數(shù)據(jù)集分析需要像 MapReduce 一樣的框架來向數(shù)十、數(shù)百或甚至數(shù)千的電腦分配工作。
“大數(shù)據(jù)”在互聯(lián)網(wǎng)行業(yè)指的是這樣一種現(xiàn)象:互聯(lián)網(wǎng)公司在日常運(yùn)營中生成、累積的用戶網(wǎng)絡(luò)行為數(shù)據(jù)。這些數(shù)據(jù)的規(guī)模是如此龐大,以至于不能用 G 或 T 來衡量。所以如何高效的處理分析大數(shù)據(jù)的問題擺在了面前。對于大數(shù)據(jù)的處理優(yōu)化方式有很多種,本文中主要介紹在使用 Hadoop 平臺中對數(shù)據(jù)進(jìn)行壓縮處理來提高數(shù)據(jù)處理效率。
壓縮簡介
Hadoop 作為一個(gè)較通用的海量數(shù)據(jù)處理平臺,每次運(yùn)算都會需要處理大量數(shù)據(jù),我們會在 Hadoop 系統(tǒng)中對數(shù)據(jù)進(jìn)行壓縮處理來優(yōu)化磁盤使用率,提高數(shù)據(jù)在磁盤和網(wǎng)絡(luò)中的傳輸速度,從而提高系統(tǒng)處理數(shù)據(jù)的效率。在使用壓縮方式方面,主要考慮壓縮速度和壓縮文件的可分割性。綜合所述,使用壓縮的優(yōu)點(diǎn)如下:
1. 節(jié)省數(shù)據(jù)占用的磁盤空間;
2. 加快數(shù)據(jù)在磁盤和網(wǎng)絡(luò)中的傳輸速度,從而提高系統(tǒng)的處理速度。
壓縮格式
Hadoop 對于壓縮格式的是自動識別。如果我們壓縮的文件有相應(yīng)壓縮格式的擴(kuò)展名(比如 lzo,gz,bzip2 等)。Hadoop 會根據(jù)壓縮格式的擴(kuò)展名自動選擇相對應(yīng)的解碼器來解壓數(shù)據(jù),此過程完全是 Hadoop 自動處理,我們只需要確保輸入的壓縮文件有擴(kuò)展名。
Hadoop 對每個(gè)壓縮格式的支持, 詳細(xì)見下表:
表 1. 壓縮格式
壓縮格式 工具 算法 擴(kuò)展名 多文件 可分割性
DEFLATE 無 DEFLATE .deflate 不 不
GZIP gzip DEFLATE .gzp 不 不
ZIP zip DEFLATE .zip 是 是,在文件范圍內(nèi)
BZIP2 bzip2 BZIP2 .bz2 不 是
LZO lzop LZO .lzo 不 是
如果壓縮的文件沒有擴(kuò)展名,則需要在執(zhí)行 MapReduce 任務(wù)的時(shí)候指定輸入格式。
1
2
3
4
5
hadoop jar /usr/home/hadoop/hadoop-0.20.2/contrib/streaming/
hadoop-streaming-0.20.2-CD H3B4.jar -file /usr/home/hadoop/hello/mapper.py -mapper /
usr/home/hadoop/hello/mapper.py -file /usr/home/hadoop/hello/
reducer.py -reducer /usr/home/hadoop/hello/reducer.py -input lzotest -output result4 -
jobconf mapred.reduce.tasks=1*-inputformatorg.apache.hadoop.mapred.LzoTextInputFormat*
性能對比
Hadoop 下各種壓縮算法的壓縮比,壓縮時(shí)間,解壓時(shí)間見下表:
表 2. 性能對比
壓縮算法 原始文件大小 壓縮文件大小 壓縮速度 解壓速度
gzip 8.3GB 1.8GB 17.5MB/s 58MB/s
bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s
LZO-bset 8.3GB 2GB 4MB/s 60.6MB/s
LZO 8.3GB 2.9GB 49.3MB/s 74.6MB/s
因此我們可以得出:
1) Bzip2 壓縮效果明顯是最好的,但是 bzip2 壓縮速度慢,可分割。
2) Gzip 壓縮效果不如 Bzip2,但是壓縮解壓速度快,不支持分割。
3) LZO 壓縮效果不如 Bzip2 和 Gzip,但是壓縮解壓速度最快!并且支持分割!
這里提一下,文件的可分割性在 Hadoop 中是很非常重要的,它會影響到在執(zhí)行作業(yè)時(shí) Map 啟動的個(gè)數(shù),從而會影響到作業(yè)的執(zhí)行效率!
所有的壓縮算法都顯示出一種時(shí)間空間的權(quán)衡,更快的壓縮和解壓速度通常會耗費(fèi)更多的空間。在選擇使用哪種壓縮格式時(shí),我們應(yīng)該根據(jù)自身的業(yè)務(wù)需求來選擇。
下圖是在本地壓縮與通過流將壓縮結(jié)果上傳到 BI 的時(shí)間對比。
圖 1. 時(shí)間對比
圖 1. 時(shí)間對比
使用方式
MapReduce 可以在三個(gè)階段中使用壓縮。
1. 輸入壓縮文件。如果輸入的文件是壓縮過的,那么在被 MapReduce 讀取時(shí),它們會被自動解壓。
2.MapReduce 作業(yè)中,對 Map 輸出的中間結(jié)果集壓縮。實(shí)現(xiàn)方式如下:
1)可以在 core-site.xml 文件中配置,代碼如下
圖 2. core-site.xml 代碼示例
圖 2. core-site.xml 代碼示例
2)使用 Java 代碼指定
1
2
conf.setCompressMapOut(true);
conf.setMapOutputCompressorClass(GzipCode.class);
最后一行代碼指定 Map 輸出結(jié)果的編碼器。
3.MapReduce 作業(yè)中,對 Reduce 輸出的最終結(jié)果集壓。實(shí)現(xiàn)方式如下:
1)可以在 core-site.xml 文件中配置,代碼如下
圖 3. core-site.xml 代碼示例
圖 3. core-site.xml 代碼示例
2)使用 Java 代碼指定
1
2
conf.setBoolean(“mapred.output.compress”,true);
conf.setClass(“mapred.output.compression.codec”,GzipCode.class,CompressionCodec.class);
最后一行同樣指定 Reduce 輸出結(jié)果的編碼器。
壓縮框架
我們前面已經(jīng)提到過關(guān)于壓縮的使用方式,其中第一種就是將壓縮文件直接作為入口參數(shù)交給 MapReduce 處理,MapReduce 會自動根據(jù)壓縮文件的擴(kuò)展名來自動選擇合適解壓器處理數(shù)據(jù)。那么到底是怎么實(shí)現(xiàn)的呢?如下圖所示:
圖 4. 壓縮實(shí)現(xiàn)情形
圖 4. 壓縮實(shí)現(xiàn)情形
我們在配置 Job 作業(yè)的時(shí)候,會設(shè)置數(shù)據(jù)輸入的格式化方式,使用 conf.setInputFormat() 方法,這里的入口參數(shù)是 TextInputFormat.class。
TextInputFormat.class 繼承于 InputFormat.class,主要用于對數(shù)據(jù)進(jìn)行兩方面的預(yù)處理。一是對輸入數(shù)據(jù)進(jìn)行切分,生成一組 split,一個(gè) split 會分發(fā)給一個(gè) mapper 進(jìn)行處理;二是針對每個(gè) split,再創(chuàng)建一個(gè) RecordReader 讀取 split 內(nèi)的數(shù)據(jù),并按照
的形式組織成一條 record 傳給 map 函數(shù)進(jìn)行處理。此類在對數(shù)據(jù)進(jìn)行切分之前,會首先初始化壓縮解壓工程類 CompressionCodeFactory.class,通過工廠獲取實(shí)例化的編碼解碼器 CompressionCodec 后對數(shù)據(jù)處理操作。
下面我們來詳細(xì)的看一下從壓縮工廠獲取編碼解碼器的過程。
壓縮解壓工廠類 CompressionCodecFactory
壓縮解壓工廠類 CompressionCodeFactory.class 主要功能就是負(fù)責(zé)根據(jù)不同的文件擴(kuò)展名來自動獲取相對應(yīng)的壓縮解壓器 CompressionCodec.class,是整個(gè)壓縮框架的核心控制器。我們來看下 CompressionCodeFactory.class 中的幾個(gè)重要方法:
1. 初始化方法
圖 5. 代碼示例
圖 5. 代碼示例
① getCodeClasses(conf) 負(fù)責(zé)獲取關(guān)于編碼解碼器 CompressionCodec.class 的配置信息。下面將會詳細(xì)講解。
② 默認(rèn)添加兩種編碼解碼器。當(dāng) getCodeClass(conf) 方法沒有讀取到相關(guān)的編碼解碼器 CompressionCodec.class 的配置信息時(shí),系統(tǒng)會默認(rèn)添加兩種編碼解碼器 CompressionCodec.class,分別是 GzipCode.class 和 DefaultCode.class。
③ addCode(code) 此方法用于將編碼解碼器 CompressionCodec.class 添加到系統(tǒng)緩存中。下面將會詳細(xì)講解。
2. getCodeClasses(conf)
圖 6. 代碼示例
圖 6. 代碼示例
① 這里我們可以看,系統(tǒng)讀取關(guān)于編碼解碼器 CompressionCodec.class 的配置信息在 core-site.xml 中 io.compression.codes 下。我們看下這段配置文件,如下圖所示:
圖 7. 代碼示例
圖 7. 代碼示例
Value 標(biāo)簽中是每個(gè)編碼解碼 CompressionCodec.class 的完整路徑,中間用逗號分隔。我們只需要將自己需要使用到的編碼解碼配置到此屬性中,系統(tǒng)就會自動加載到緩存中。
除了上述的這種方式以外,Hadoop 為我們提供了另一種加載方式:代碼加載。同樣最終將信息配置在 io.compression.codes 屬性中,代碼如下:
1
2
conf.set("io.compression.codecs","org.apache.hadoop.io.compress.DefaultCodec,
org.apache.hadoop.io.compress.GzipCodec,com.hadoop.compression.lzo.LzopCodec");)
3. addCode(code) 方法添加編碼解碼器
圖 8. 代碼示例
圖 8. 代碼示例
addCodec(codec) 方法入口參數(shù)是個(gè)編碼解碼器 CompressionCodec.class,這里我們會首先接觸到它的一個(gè)方法。
① codec.getDefaultExtension() 方法看方法名的字面意思我們就可以知道,此方法用于獲取此編碼解碼所對應(yīng)文件的擴(kuò)展名,比如,文件名是 xxxx.gz2,那么這個(gè)方法的返回值就是“.bz2”,我們來看下 org.apache.hadoop.io.compress.BZip2Codec 此方法的實(shí)現(xiàn)代碼:
圖 9. 代碼示例
圖 9. 代碼示例
② Codecs 是一個(gè) SortedMap 的示例。這里有個(gè)很有意思的地方,它將 Key 值,也就是通過 codec.getDefaultExtension() 方法獲取到的文件擴(kuò)展名進(jìn)行了翻轉(zhuǎn),舉個(gè)例子,比如文件名擴(kuò)展名“.bz2”,將文件名翻轉(zhuǎn)之后就變成了“2zb.”。
系統(tǒng)加載完所有的編碼解碼器后,我們可以得到這樣一個(gè)有序映射表,如下:
圖 10. 代碼示例
圖 10. 代碼示例
現(xiàn)在編碼解碼器都有了,我們怎么得到對應(yīng)的編碼解碼器呢?看下面這個(gè)方法。
4. getCodec() 方法
此方法用于獲取文件所對應(yīng)的的編碼解碼器 CompressionCodec.class。
圖 11. 代碼示例
圖 11. 代碼示例
getCodec(Path) 方法的輸入?yún)?shù)是 Path 對象,保存著文件路徑。
① 將文件名翻轉(zhuǎn)。如 xxxx.bz2 翻轉(zhuǎn)成 2zb.xxxx。
② 獲取 codecs 集合中最接近 2zb.xxxx 的值。此方法有返回值同樣是個(gè) SortMap 對象。
在這里對返回的 SortMap 對象進(jìn)行第二次篩選。
編碼解碼器 CompressionCodec
剛剛在介紹壓縮解壓工程類 CompressionCodeFactory.class 的時(shí)候,我們多次提到了壓縮解壓器 CompressionCodecclass,并且我們在上文中還提到了它其中的一個(gè)用于獲取文件擴(kuò)展名的方法 getDefaultExtension()。
壓縮解壓工程類 CompressionCodeFactory.class 使用的是抽象工廠的設(shè)計(jì)模式。它是一個(gè)接口,制定了一系列方法,用于創(chuàng)建特定壓縮解壓算法。下面我們來看下比較重要的幾個(gè)方法:
1. createOutputStream() 方法對數(shù)據(jù)流進(jìn)行壓縮。
圖 12. 代碼示例
圖 12. 代碼示例
此方法提供了方法重載。
① 基于流的壓縮處理;
② 基于壓縮機(jī) Compress.class 的壓縮處理
2. createInputStream() 方法對數(shù)據(jù)流進(jìn)行解壓。
圖 13. 代碼示例
圖 13. 代碼示例
這里的解壓方法同樣提供了方法重載。
① 基于流的解壓處理;
② 基于解壓機(jī) Decompressor.class 的解壓處理;
關(guān)于壓縮/解壓流與壓縮/解壓機(jī)會在下面的文章中我們會詳細(xì)講解。此處暫作了解。
3. getCompressorType() 返回需要的編碼器的類型。
getDefaultExtension() 獲取對應(yīng)文件擴(kuò)展名的方法。前文已提到過,不再敖述。
壓縮機(jī) Compressor 和解壓機(jī) Decompressor
前面在編碼解碼器部分的 createInputStream() 和 createInputStream() 方法中我們提到過 Compressor.class 和 Decompressor.class 對象。在 Hadoop 的實(shí)現(xiàn)中,數(shù)據(jù)編碼器和解碼器被抽象成了兩個(gè)接口:
1. org.apache.hadoop.io.compress.Compressor;
2. org.apache.hadoop.io.compress.Decompressor;
它們規(guī)定了一系列的方法,所以在 Hadoop 內(nèi)部的編碼/解碼算法實(shí)現(xiàn)都需要實(shí)現(xiàn)對應(yīng)的接口。在實(shí)際的數(shù)據(jù)壓縮與解壓縮過程,Hadoop 為用戶提供了統(tǒng)一的 I/O 流處理模式。
我們看一下壓縮機(jī) Compressor.class,代碼如下:
圖 14. 代碼示例
圖 14. 代碼示例
① setInput() 方法接收數(shù)據(jù)到內(nèi)部緩沖區(qū),可以多次調(diào)用;
② needsInput() 方法用于檢查緩沖區(qū)是否已滿。如果是 false 則說明當(dāng)前的緩沖區(qū)已滿;
③ getBytesRead() 輸入未壓縮字節(jié)的總數(shù);
④ getBytesWritten() 輸出壓縮字節(jié)的總數(shù);
⑤ finish() 方法結(jié)束數(shù)據(jù)輸入的過程;
⑥ finished() 方法用于檢查是否已經(jīng)讀取完所有的等待壓縮的數(shù)據(jù)。如果返回 false,表明壓縮器中還有未讀取的壓縮數(shù)據(jù),可以繼續(xù)通過 compress() 方法讀取;
⑦ compress() 方法獲取壓縮后的數(shù)據(jù),釋放緩沖區(qū)空間;
⑧ reset() 方法用于重置壓縮器,以處理新的輸入數(shù)據(jù)集合;
⑨ end() 方法用于關(guān)閉解壓縮器并放棄所有未處理的輸入;
⑩ reinit() 方法更進(jìn)一步允許使用 Hadoop 的配置系統(tǒng),重置并重新配置壓縮器;
為了提高壓縮效率,并不是每次用戶調(diào)用 setInput() 方法,壓縮機(jī)就會立即工作,所以,為了通知壓縮機(jī)所有數(shù)據(jù)已經(jīng)寫入,必須使用 finish() 方法。finish() 調(diào)用結(jié)束后,壓縮機(jī)緩沖區(qū)中保持的已經(jīng)壓縮的數(shù)據(jù),可以繼續(xù)通過 compress() 方法獲得。至于要判斷壓縮機(jī)中是否還有未讀取的壓縮數(shù)據(jù),則需要利用 finished() 方法來判斷。
壓縮流 CompressionOutputStream 和解壓縮流 CompressionInputStream
前文編碼解碼器部分提到過 createInputStream() 方法返回 CompressionOutputStream 對象,createInputStream() 方法返回 CompressionInputStream 對象。這兩個(gè)類分別繼承自 java.io.OutputStream 和 java.io.InputStream。從而我們不難理解,這兩個(gè)對象的作用了吧。
我們來看下 CompressionInputStream.class 的代碼:
圖 15. 代碼示例
圖 15. 代碼示例
可以看到 CompressionOutputStream 實(shí)現(xiàn)了 OutputStream 的 close() 方法和 flush() 方法,但用于輸出數(shù)據(jù)的 write() 方法以及用于結(jié)束壓縮過程并將輸入寫到底層流的 finish() 方法和重置壓縮狀態(tài)的 resetState() 方法還是抽象方法,需要 CompressionOutputStream 的子類實(shí)現(xiàn)。
Hadoop 壓縮框架中為我們提供了一個(gè)實(shí)現(xiàn)了 CompressionOutputStream 類通用的子類 CompressorStream.class。
圖 16. 代碼示例
圖 16. 代碼示例
CompressorStream.class 提供了三個(gè)不同的構(gòu)造函數(shù),CompressorStream 需要的底層輸出流 out 和壓縮時(shí)使用的壓縮器,都作為參數(shù)傳入構(gòu)造函數(shù)。另一個(gè)參數(shù)是 CompressorStream 工作時(shí)使用的緩沖區(qū) buffer 的大小,構(gòu)造時(shí)會利用這個(gè)參數(shù)分配該緩沖區(qū)。第一個(gè)可以手動設(shè)置緩沖區(qū)大小,第二個(gè)默認(rèn) 512,第三個(gè)沒有緩沖區(qū)且不可使用壓縮器。
圖 17. 代碼示例
圖 17. 代碼示例
在 write()、compress()、finish() 以及 resetState() 方法中,我們發(fā)現(xiàn)了壓縮機(jī) Compressor 的身影,前面文章我們已經(jīng)介紹過壓縮機(jī)的的實(shí)現(xiàn)過程,通過調(diào)用 setInput() 方法將待壓縮數(shù)據(jù)填充到內(nèi)部緩沖區(qū),然后調(diào)用 needsInput() 方法檢查緩沖區(qū)是否已滿,如果緩沖區(qū)已滿,將調(diào)用 compress() 方法對數(shù)據(jù)進(jìn)行壓縮。流程如下圖所示:
圖 18. 調(diào)用流程圖
圖 18. 調(diào)用流程圖
結(jié)束語
本文深入到 Hadoop 平臺壓縮框架內(nèi)部,對其核心代碼以及各壓縮格式的效率進(jìn)行對比分析,以幫助讀者在使用 Hadoop 平臺時(shí),可以通過對數(shù)據(jù)進(jìn)行壓縮處理來提高數(shù)據(jù)處理效率。當(dāng)再次面臨海量數(shù)據(jù)處理時(shí), Hadoop 平臺的壓縮機(jī)制可以讓我們事半功倍。
相關(guān)主題
Hadoop 在線 API
《Hadoop 技術(shù)內(nèi)幕深入解析 HADOOP COMMON 和 HDFS 架構(gòu)設(shè)計(jì)與實(shí)現(xiàn)原理》
developerWorks 開源技術(shù)主題:查找豐富的操作信息、工具和項(xiàng)目更新,幫助您掌握開源技術(shù)并將其用于 IBM 產(chǎn)品。
posted on 2017-09-14 17:35
xzc 閱讀(560)
評論(0) 編輯 收藏 所屬分類:
hadoop