一、分布式計算開源框架Hadoop實踐
在SIP項目設(shè)計的過程中,對于它龐大的日志在開始時就考慮使用任務(wù)分解的多線程處理模式來分析統(tǒng)計,在我從前寫的文章《Tiger Concurrent Practice --日志分析并行分解設(shè)計與實現(xiàn)》中有所提到。但是由于統(tǒng)計的內(nèi)容暫時還是十分簡單,所以就采用Memcache作為計數(shù)器,結(jié)合MySQL就完成了訪問控制以及統(tǒng)計的工作。然而未來,對于海量日志分析的工作,還是需要有所準備。現(xiàn)在最火的技術(shù)詞匯莫過于“云計算”,在Open API日益盛行的今天,互聯(lián)網(wǎng)應(yīng)用的數(shù)據(jù)將會越來越有價值,如何去分析這些數(shù)據(jù),挖掘其內(nèi)在價值,就需要分布式計算來支撐海量數(shù)據(jù)的分析工作。
回過頭來看,早先那種多線程,多任務(wù)分解的日志分析設(shè)計,其實是分布式計算的一個單機版縮略,如何將這種單機的工作進行分拆,變成協(xié)同工作的集群,其實就是分布式計算框架設(shè)計所涉及的。在去年參加BEA大會的時候,BEA和VMWare合作采用虛擬機來構(gòu)建集群,無非就是希望使得計算機硬件能夠類似于應(yīng)用程序中資源池的資源,使用者無需關(guān)心資源的分配情況,從而最大化了硬件資源的使用價值。分布式計算也是如此,具體的計算任務(wù)交由哪一臺機器執(zhí)行,執(zhí)行后由誰來匯總,這都由分布式框架的Master來抉擇,而使用者只需簡單地將待分析內(nèi)容提供給分布式計算系統(tǒng)作為輸入,就可以得到分布式計算后的結(jié)果。
Hadoop是Apache開源組織的一個分布式計算開源框架,在很多大型網(wǎng)站上都已經(jīng)得到了應(yīng)用,如亞馬遜、Facebook和Yahoo等等。 對于我來說,最近的一個使用點就是服務(wù)集成平臺的日志分析。服務(wù)集成平臺的日志量將會很大,而這也正好符合了分布式計算的適用場景(日志分析和索引建立就 是兩大應(yīng)用場景)。
當(dāng)前沒有正式確定使用,所以也是自己業(yè)余摸索,后續(xù)所寫的相關(guān)內(nèi)容,都是一個新手的學(xué)習(xí)過程,難免會有一些錯誤,只是希望記錄下來可以分享給更多志同道合的朋友。
什么是Hadoop?
搞什么東西之前,第一步是要知道What(是什么),然后是Why(為什么),最后才是How(怎么做)。但很多開發(fā)的朋友在做了多年項目以后,都習(xí)慣是先How,然后What,最后才是Why,這樣只會讓自己變得浮躁,同時往往會將技術(shù)誤用于不適合的場景。
Hadoop框架中最核心的設(shè)計就是:MapReduce和HDFS。MapReduce的思想是由Google的一篇論文所提及而被廣為流傳的, 簡單的一句話解釋MapReduce就是“任務(wù)的分解與結(jié)果的匯總”。HDFS是Hadoop分布式文件系統(tǒng)(Hadoop Distributed File System)的縮寫,為分布式計算存儲提供了底層支持。
MapReduce從它名字上來看就大致可以看出個緣由,兩個動詞Map和Reduce,“Map(展開)”就是將一個任務(wù)分解成為多個任 務(wù),“Reduce”就是將分解后多任務(wù)處理的結(jié)果匯總起來,得出最后的分析結(jié)果。這不是什么新思想,其實在前面提到的多線程,多任務(wù)的設(shè)計就可以找到這 種思想的影子。不論是現(xiàn)實社會,還是在程序設(shè)計中,一項工作往往可以被拆分成為多個任務(wù),任務(wù)之間的關(guān)系可以分為兩種:一種是不相關(guān)的任務(wù),可以并行執(zhí) 行;另一種是任務(wù)之間有相互的依賴,先后順序不能夠顛倒,這類任務(wù)是無法并行處理的。回到大學(xué)時期,教授上課時讓大家去分析關(guān)鍵路徑,無非就是找最省時的 任務(wù)分解執(zhí)行方式。在分布式系統(tǒng)中,機器集群就可以看作硬件資源池,將并行的任務(wù)拆分,然后交由每一個空閑機器資源去處理,能夠極大地提高計算效率,同時 這種資源無關(guān)性,對于計算集群的擴展無疑提供了最好的設(shè)計保證。(其實我一直認為Hadoop的卡通圖標不應(yīng)該是一個小象,應(yīng)該是螞蟻,分布式計算就好比 螞蟻吃大象,廉價的機器群可以匹敵任何高性能的計算機,縱向擴展的曲線始終敵不過橫向擴展的斜線)。任務(wù)分解處理以后,那就需要將處理以后的結(jié)果再匯總起 來,這就是Reduce要做的工作。

圖1:MapReduce結(jié)構(gòu)示意圖
上圖就是MapReduce大致的結(jié)構(gòu)圖,在Map前還可能會對輸入的數(shù)據(jù)有Split(分割)的過程,保證任務(wù)并行效率,在Map之后還會有Shuffle(混合)的過程,對于提高Reduce的效率以及減小數(shù)據(jù)傳輸?shù)膲毫τ泻艽蟮膸椭:竺鏁唧w提及這些部分的細節(jié)。
HDFS是分布式計算的存儲基石,Hadoop的分布式文件系統(tǒng)和其他分布式文件系統(tǒng)有很多類似的特質(zhì)。分布式文件系統(tǒng)基本的幾個特點:
- 對于整個集群有單一的命名空間。
- 數(shù)據(jù)一致性。適合一次寫入多次讀取的模型,客戶端在文件沒有被成功創(chuàng)建之前無法看到文件存在。
- 文件會被分割成多個文件塊,每個文件塊被分配存儲到數(shù)據(jù)節(jié)點上,而且根據(jù)配置會由復(fù)制文件塊來保證數(shù)據(jù)的安全性。

圖2:HDFS結(jié)構(gòu)示意圖
上圖中展現(xiàn)了整個HDFS三個重要角色:NameNode、DataNode和Client。NameNode可以看作是分布式文件系統(tǒng)中的管理 者,主要負責(zé)管理文件系統(tǒng)的命名空間、集群配置信息和存儲塊的復(fù)制等。NameNode會將文件系統(tǒng)的Meta-data存儲在內(nèi)存中,這些信息主要包括 了文件信息、每一個文件對應(yīng)的文件塊的信息和每一個文件塊在DataNode的信息等。DataNode是文件存儲的基本單元,它將Block存儲在本地 文件系統(tǒng)中,保存了Block的Meta-data,同時周期性地將所有存在的Block信息發(fā)送給NameNode。Client就是需要獲取分布式文 件系統(tǒng)文件的應(yīng)用程序。這里通過三個操作來說明他們之間的交互關(guān)系。
文件寫入:
- Client向NameNode發(fā)起文件寫入的請求。
- NameNode根據(jù)文件大小和文件塊配置情況,返回給Client它所管理部分DataNode的信息。
- Client將文件劃分為多個Block,根據(jù)DataNode的地址信息,按順序?qū)懭氲矫恳粋€DataNode塊中。
文件讀取:
- Client向NameNode發(fā)起文件讀取的請求。
- NameNode返回文件存儲的DataNode的信息。
- Client讀取文件信息。
文件Block復(fù)制:
- NameNode發(fā)現(xiàn)部分文件的Block不符合最小復(fù)制數(shù)或者部分DataNode失效。
- 通知DataNode相互復(fù)制Block。
- DataNode開始直接相互復(fù)制。
最后再說一下HDFS的幾個設(shè)計特點(對于框架設(shè)計值得借鑒):
- Block的放置:默認不配置。一個Block會有三份備份,一份放在NameNode指定的DataNode,另一份放在 與指定DataNode非同一Rack上的DataNode,最后一份放在與指定DataNode同一Rack上的DataNode上。備份無非就是為了 數(shù)據(jù)安全,考慮同一Rack的失敗情況以及不同Rack之間數(shù)據(jù)拷貝性能問題就采用這種配置方式。
- 心跳檢測DataNode的健康狀況,如果發(fā)現(xiàn)問題就采取數(shù)據(jù)備份的方式來保證數(shù)據(jù)的安全性。
- 數(shù) 據(jù)復(fù)制(場景為DataNode失敗、需要平衡DataNode的存儲利用率和需要平衡DataNode數(shù)據(jù)交互壓力等情況):這里先說一下,使用 HDFS的balancer命令,可以配置一個Threshold來平衡每一個DataNode磁盤利用率。例如設(shè)置了Threshold為10%,那么 執(zhí)行balancer命令的時候,首先統(tǒng)計所有DataNode的磁盤利用率的均值,然后判斷如果某一個DataNode的磁盤利用率超過這個均值 Threshold以上,那么將會把這個DataNode的block轉(zhuǎn)移到磁盤利用率低的DataNode,這對于新節(jié)點的加入來說十分有用。
- 數(shù)據(jù)交驗:采用CRC32作數(shù)據(jù)交驗。在文件Block寫入的時候除了寫入數(shù)據(jù)還會寫入交驗信息,在讀取的時候需要交驗后再讀入。
- NameNode是單點:如果失敗的話,任務(wù)處理信息將會紀錄在本地文件系統(tǒng)和遠端的文件系統(tǒng)中。
- 數(shù) 據(jù)管道性的寫入:當(dāng)客戶端要寫入文件到DataNode上,首先客戶端讀取一個Block然后寫到第一個DataNode上,然后由第一個 DataNode傳遞到備份的DataNode上,一直到所有需要寫入這個Block的NataNode都成功寫入,客戶端才會繼續(xù)開始寫下一個 Block。
- 安全模式:在分布式文件系統(tǒng)啟動的時候,開始的時候會有安全模式,當(dāng)分布式文件系統(tǒng)處于安全模式的情況下,文 件系統(tǒng)中的內(nèi)容不允許修改也不允許刪除,直到安全模式結(jié)束。安全模式主要是為了系統(tǒng)啟動的時候檢查各個DataNode上數(shù)據(jù)塊的有效性,同時根據(jù)策略必 要的復(fù)制或者刪除部分數(shù)據(jù)塊。運行期通過命令也可以進入安全模式。在實踐過程中,系統(tǒng)啟動的時候去修改和刪除文件也會有安全模式不允許修改的出錯提示,只需要等待一會兒即可。
下面綜合MapReduce和HDFS來看Hadoop的結(jié)構(gòu):

圖3:Hadoop結(jié)構(gòu)示意圖
在Hadoop的系統(tǒng)中,會有一臺Master,主要負責(zé)NameNode的工作以及JobTracker的工作。JobTracker的主要職責(zé) 就是啟動、跟蹤和調(diào)度各個Slave的任務(wù)執(zhí)行。還會有多臺Slave,每一臺Slave通常具有DataNode的功能并負責(zé)TaskTracker的 工作。TaskTracker根據(jù)應(yīng)用要求來結(jié)合本地數(shù)據(jù)執(zhí)行Map任務(wù)以及Reduce任務(wù)。
說到這里,就要提到分布式計算最重要的一個設(shè)計點:Moving Computation is Cheaper than Moving Data。就是在分布式處理中,移動數(shù)據(jù)的代價總是高于轉(zhuǎn)移計算的代價。簡單來說就是分而治之的工作,需要將數(shù)據(jù)也分而存儲,本地任務(wù)處理本地數(shù)據(jù)然后歸 總,這樣才會保證分布式計算的高效性。
為什么要選擇Hadoop?
說完了What,簡單地說一下Why。官方網(wǎng)站已經(jīng)給了很多的說明,這里就大致說一下其優(yōu)點及使用的場景(沒有不好的工具,只用不適用的工具,因此選擇好場景才能夠真正發(fā)揮分布式計算的作用):
- 可擴展:不論是存儲的可擴展還是計算的可擴展都是Hadoop的設(shè)計根本。
- 經(jīng)濟:框架可以運行在任何普通的PC上。
- 可靠:分布式文件系統(tǒng)的備份恢復(fù)機制以及MapReduce的任務(wù)監(jiān)控保證了分布式處理的可靠性。
- 高效:分布式文件系統(tǒng)的高效數(shù)據(jù)交互實現(xiàn)以及MapReduce結(jié)合Local Data處理的模式,為高效處理海量的信息作了基礎(chǔ)準備。
使用場景:個人覺得最適合的就是海量數(shù)據(jù)的分析,其實Google最早提出MapReduce也就是為了海量數(shù)據(jù)分析。同時HDFS最早是為了搜索引擎實現(xiàn)而開發(fā)的,后來才被用于分布式計算框架中。海量數(shù)據(jù)被分割于多個節(jié)點,然后由每一個節(jié)點并行計算,將得出的結(jié) 果歸并到輸出。同時第一階段的輸出又可以作為下一階段計算的輸入,因此可以想象到一個樹狀結(jié)構(gòu)的分布式計算圖,在不同階段都有不同產(chǎn)出,同時并行和串行結(jié) 合的計算也可以很好地在分布式集群的資源下得以高效的處理。
二、Hadoop中的集群配置和使用技巧
其實參看Hadoop官方文檔已經(jīng)能夠很容易配置分布式框架運行環(huán)境了,不過這里既然寫了就再多寫一點,同時有一些細節(jié)需要注意的也說明一下,其實 也就是這些細節(jié)會讓人摸索半天。Hadoop可以單機跑,也可以配置集群跑,單機跑就不需要多說了,只需要按照Demo的運行說明直接執(zhí)行命令即可。這里 主要重點說一下集群配置運行的過程。
環(huán)境
7臺普通的機器,操作系統(tǒng)都是Linux。內(nèi)存和CPU就不說了,反正Hadoop一大特點就是機器在多不在精。JDK必須是1.5以上的,這個切記。7臺機器的機器名務(wù)必不同,后續(xù)會談到機器名對于MapReduce有很大的影響。
部署考慮
正如上面我描述的,對于Hadoop的集群來說,可以分成兩大類角色:Master和Slave,前者主要配置NameNode和 JobTracker的角色,負責(zé)總管分布式數(shù)據(jù)和分解任務(wù)的執(zhí)行,后者配置DataNode和TaskTracker的角色,負責(zé)分布式數(shù)據(jù)存儲以及任 務(wù)的執(zhí)行。本來我打算看看一臺機器是否可以配置成Master,同時也作為Slave使用,不過發(fā)現(xiàn)在NameNode初始化的過程中以及 TaskTracker執(zhí)行過程中機器名配置好像有沖突(NameNode和TaskTracker對于Hosts的配置有些沖突,究竟是把機器名對應(yīng) IP放在配置前面還是把Localhost對應(yīng)IP放在前面有點問題,不過可能也是我自己的問題吧,這個大家可以根據(jù)實施情況給我反饋)。最后反正決定一 臺Master,六臺Slave,后續(xù)復(fù)雜的應(yīng)用開發(fā)和測試結(jié)果的比對會增加機器配置。
實施步驟
- 在所有的機器上都建立相同的目錄,也可以就建立相同的用戶,以該用戶的home路徑來做hadoop的安裝路徑。例如我在所有的機器上都建立了
/home/wenchu
。
- 下載Hadoop,先解壓到Master上。這里我是下載的0.17.1的版本。此時Hadoop的安裝路徑就是
/home/wenchu/hadoop-0.17.1
。
- 解壓后進入conf目錄,主要需要修改以下文件:
hadoop-env.sh
,hadoop-site.xml
、masters
、slaves
。
Hadoop的基礎(chǔ)配置文件是hadoop-default.xml
,看Hadoop的代碼可以知道,默認建立一個Job的時候會建立Job的Config,Config首先讀入hadoop-default.xml
的配置,然后再讀入hadoop-site.xml
的配置(這個文件初始的時候配置為空),hadoop-site.xml
中主要配置你需要覆蓋的hadoop-default.xml
的系統(tǒng)級配置,以及你需要在你的MapReduce過程中使用的自定義配置(具體的一些使用例如final等參考文檔)。
以下是一個簡單的hadoop-site.xml
的配置:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>fs.default.name</name>//你的namenode的配置,機器名加端口
<value>hdfs://10.2.224.46:54310/</value>
</property>
<property>
<name>mapred.job.tracker</name>//你的JobTracker的配置,機器名加端口
<value>hdfs://10.2.224.46:54311/</value>
</property>
<property>
<name>dfs.replication</name>//數(shù)據(jù)需要備份的數(shù)量,默認是三
<value>1</value>
</property>
<property>
<name>hadoop.tmp.dir</name>//Hadoop的默認臨時路徑,這個最好配置,如果在新
增節(jié)點或者其他情況下莫名其妙的DataNode啟動不了,就刪除此文件中的tmp目錄即可。
不過如果刪除了NameNode機器的此目錄,那么就需要重新執(zhí)行NameNode格式化的命令。
<value>/home/wenchu/hadoop/tmp/</value>
</property>
<property>
<name>mapred.child.java.opts</name>//java虛擬機的一些參數(shù)可以參照配置
<value>-Xmx512m</value>
</property>
<property>
<name>dfs.block.size</name>//block的大小,單位字節(jié),后面會提到用處,必須是
512的倍數(shù),因為采用crc作文件完整性校驗,默認配置512是checksum的最小單元。
<value>5120000</value>
<description>The default block size for new files.</description>
</property>
</configuration>
hadoop-env.sh
文件只需要修改一個參數(shù):
# The java implementation to use. Required.
export JAVA_HOME=/usr/ali/jdk1.5.0_10
配置你的Java路徑,記住一定要1.5版本以上,免得莫名其妙出現(xiàn)問題。
Masters中配置Masters的IP或者機器名,如果是機器名那么需要在/etc/hosts
中有所設(shè)置。Slaves中配置的是Slaves的IP或者機器名,同樣如果是機器名需要在/etc/hosts
中有所設(shè)置。范例如下,我這里配置的都是IP:
Masters:
10.2.224.46
Slaves:
10.2.226.40
10.2.226.39
10.2.226.38
10.2.226.37
10.2.226.41
10.2.224.36
- 建立Master到每一臺Slave的SSH受信證書。由于Master將會通過SSH啟動所有Slave的Hadoop,所以需要建立單向或者雙向證書保證命令執(zhí)行時不需要再輸入密碼。在Master和所有的Slave機器上執(zhí)行:
ssh-keygen -t rsa
。執(zhí)行此命令的時候,看到提示只需要回車。然后就會在/root/.ssh/
下面產(chǎn)生id_rsa.pub
的證書文件,通過scp將Master機器上的這個文件拷貝到Slave上(記得修改名稱),例如:scp root@masterIP:/root/.ssh/id_rsa.pub /root/.ssh/46_rsa.pub
,然后執(zhí)行cat /root/.ssh/46_rsa.pub >>/root/.ssh/authorized_keys
,建立authorized_keys
文 件即可,可以打開這個文件看看,也就是rsa的公鑰作為key,user@IP作為value。此時可以試驗一下,從master ssh到slave已經(jīng)不需要密碼了。由slave反向建立也是同樣。為什么要反向呢?其實如果一直都是Master啟動和關(guān)閉的話那么沒有必要建立反 向,只是如果想在Slave也可以關(guān)閉Hadoop就需要建立反向。
- 將Master上的Hadoop通過scp拷貝到每一個Slave相同的目錄下,根據(jù)每一個Slave的
Java_HOME
的不同修改其hadoop-env.sh
。
- 修改Master上
/etc/profile:
新增以下內(nèi)容:(具體的內(nèi)容根據(jù)你的安裝路徑修改,這步只是為了方便使用)
export HADOOP_HOME=/home/wenchu/hadoop-0.17.1
export PATH=$PATH:$HADOOP_HOME/bin
修改完畢后,執(zhí)行source /etc/profile
來使其生效。
- 在Master上執(zhí)行
Hadoop namenode –format
,這是第一需要做的初始化,可以看作格式化吧,以后除了在上面我提到過刪除了Master上的hadoop.tmp.dir
目錄,否則是不需要再次執(zhí)行的。
- 然后執(zhí)行Master上的
start-all.sh
,這個命令可以直接執(zhí)行,因為在6中已經(jīng)添加到了path路徑,這個命令是啟動hdfs和mapreduce兩部分,當(dāng)然你也可以分開單獨啟動hdfs和mapreduce,分別是bin目錄下的start-dfs.sh
和start-mapred.sh
。
- 檢查Master的logs目錄,看看Namenode日志以及JobTracker日志是否正常啟動。
- 檢查Slave的logs目錄看看Datanode日志以及TaskTracker日志是否正常。
- 如果需要關(guān)閉,那么就直接執(zhí)行
stop-all.sh
即可。
以上步驟就可以啟動Hadoop的分布式環(huán)境,然后在Master的機器進入Master的安裝目錄,執(zhí)行hadoop jar hadoop-0.17.1-examples.jar wordcount
輸入路徑和輸出路徑,就可以看到字數(shù)統(tǒng)計的效果了。此處的輸入路徑和輸出路徑都指的是HDFS中的路徑,因此你可以首先通過拷貝本地文件系統(tǒng)中的目錄到HDFS中的方式來建立HDFS中的輸入路徑:
hadoop dfs -copyFromLocal /home/wenchu/test-in test-in。
其中/home/wenchu/test-in
是本地路徑,test-in
是將會建立在HDFS中的路徑,執(zhí)行完畢以后可以通過hadoop dfs –ls
看到test-in目錄已經(jīng)存在,同時可以通過hadoop dfs –ls test-in
查看里面的內(nèi)容。輸出路徑要求是在HDFS中不存在的,當(dāng)執(zhí)行完那個demo以后,就可以通過hadoop dfs –ls
輸出路徑看到其中的內(nèi)容,具體文件的內(nèi)容可以通過hadoop dfs –cat
文件名稱來查看。
經(jīng)驗總結(jié)和注意事項(這部分是我在使用過程中花了一些時間走的彎路):
- Master和Slave上的幾個conf配置文件不需要全部同步,如果確定都是通過Master去啟動和關(guān)閉,那么Slave機器上的配置不需要去維護。但如果希望在任意一臺機器都可以啟動和關(guān)閉Hadoop,那么就需要全部保持一致了。
- Master和Slave機器上的
/etc/hosts
中 必須把集群中機器都配置上去,就算在各個配置文件中使用的是IP。這個吃過不少苦頭,原來以為如果配成IP就不需要去配置Host,結(jié)果發(fā)現(xiàn)在執(zhí)行 Reduce的時候總是卡住,在拷貝的時候就無法繼續(xù)下去,不斷重試。另外如果集群中如果有兩臺機器的機器名如果重復(fù)也會出現(xiàn)問題。
- 如果在新增了節(jié)點或者刪除節(jié)點的時候出現(xiàn)了問題,首先就去刪除Slave的
hadoop.tmp.dir
,然后重新啟動試試看,如果還是不行那就干脆把Master的hadoop.tmp.dir
刪除(意味著dfs上的數(shù)據(jù)也會丟失),如果刪除了Master的hadoop.tmp.dir
,那么就需要重新namenode –format
。
- Map任務(wù)個數(shù)以及Reduce任務(wù)個數(shù)配置。前面分布式文件系統(tǒng)設(shè)計提到一個文件被放入到分布式文件系統(tǒng)中,會被分割成多個block放置到每一個的DataNode上,默認
dfs.block.size
應(yīng)該是64M,也就是說如果你放置到HDFS上的數(shù)據(jù)小于64,那么將只有一個Block,此時會被放置到某一個DataNode中,這個可以通過使用命令:hadoop dfsadmin –report
就可以看到各個節(jié)點存儲的情況。也可以直接去某一個DataNode查看目錄:hadoop.tmp.dir/dfs/data/current
就 可以看到那些block了。Block的數(shù)量將會直接影響到Map的個數(shù)。當(dāng)然可以通過配置來設(shè)定Map和Reduce的任務(wù)個數(shù)。Map的個數(shù)通常默認 和HDFS需要處理的blocks相同。也可以通過配置Map的數(shù)量或者配置minimum split size來設(shè)定,實際的個數(shù)為:max(min(block_size,data/#maps),min_split_size)
。Reduce可以通過這個公式計算:0.95*num_nodes*mapred.tasktracker.tasks.maximum
。
總的來說出了問題或者啟動的時候最好去看看日志,這樣心里有底。
Hadoop中的命令(Command)總結(jié)
這部分內(nèi)容其實可以通過命令的Help以及介紹了解,我主要側(cè)重于介紹一下我用的比較多的幾個命令。Hadoop dfs 這個命令后面加參數(shù)就是對于HDFS的操作,和Linux操作系統(tǒng)的命令很類似,例如:
Hadoop dfs –ls
就是查看/usr/root目錄下的內(nèi)容,默認如果不填路徑這就是當(dāng)前用戶路徑;
Hadoop dfs –rmr xxx
就是刪除目錄,還有很多命令看看就很容易上手;
Hadoop dfsadmin –report
這個命令可以全局的查看DataNode的情況;
Hadoop job
后面增加參數(shù)是對于當(dāng)前運行的Job的操作,例如list,kill等;
Hadoop balancer
就是前面提到的均衡磁盤負載的命令。
其他就不詳細介紹了。
三、Hadoop基本流程與應(yīng)用開發(fā)
Hadoop基本流程


一個圖片太大了,只好分割成為兩部分。根據(jù)流程圖來說一下具體一個任務(wù)執(zhí)行的情況。
- 在分布式環(huán)境中客戶端創(chuàng)建任務(wù)并提交。
- InputFormat做Map前的預(yù)處理,主要負責(zé)以下工作:
- 驗證輸入的格式是否符合JobConfig的輸入定義,這個在實現(xiàn)Map和構(gòu)建Conf的時候就會知道,不定義可以是Writable的任意子類。
- 將input的文件切分為邏輯上的輸入InputSplit,其實這就是在上面提到的在分布式文件系統(tǒng)中blocksize是有大小限制的,因此大文件會被劃分為多個block。
- 通過RecordReader來再次處理inputsplit為一組records,輸出給Map。(inputsplit只是邏輯切分的第一步,但是如何根據(jù)文件中的信息來切分還需要RecordReader來實現(xiàn),例如最簡單的默認方式就是回車換行的切分)
- RecordReader處理后的結(jié)果作為Map的輸入,Map執(zhí)行定義的Map邏輯,輸出處理后的key和value對應(yīng)到臨時中間文件。
- Combiner可選擇配置,主要作用是在每一個Map執(zhí)行完分析以后,在本地優(yōu)先作Reduce的工作,減少在Reduce過程中的數(shù)據(jù)傳輸量。
- Partitioner可選擇配置,主要作用是在多個Reduce的情況下,指定Map的結(jié)果由某一個Reduce處理,每一個Reduce都會有單獨的輸出文件。(后面的代碼實例中有介紹使用場景)
- Reduce執(zhí)行具體的業(yè)務(wù)邏輯,并且將處理結(jié)果輸出給OutputFormat。
- OutputFormat的職責(zé)是,驗證輸出目錄是否已經(jīng)存在,同時驗證輸出結(jié)果類型是否如Config中配置,最后輸出Reduce匯總后的結(jié)果。
業(yè)務(wù)場景和代碼范例
業(yè)務(wù)場景描述:可設(shè)定輸入和輸出路徑(操作系統(tǒng)的路徑非HDFS路徑),根據(jù)訪問日志分析某一個應(yīng)用訪問某一個API的總次數(shù)和總流量,統(tǒng)計后分別輸出到兩個文件中。這里僅僅為了測試,沒有去細分很多類,將所有的類都歸并于一個類便于說明問題。

測試代碼類圖
LogAnalysiser就是主類,主要負責(zé)創(chuàng)建、提交任務(wù),并且輸出部分信息。內(nèi)部的幾個子類用途可以參看流程中提到的角色職責(zé)。具體地看看幾個類和方法的代碼片斷:
LogAnalysiser::MapClass
public static class MapClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, LongWritable>
{
public void map(LongWritable key, Text value, OutputCollector<Text,
LongWritable> output, Reporter reporter)
throws IOException
{
String line = value.toString();//沒有配置RecordReader,所以默認采用line
的實現(xiàn),key就是行號,value就是行內(nèi)容
if (line == null || line.equals(""))
return;
String[] words = line.split(",");
if (words == null || words.length < 8)
return;
String appid = words[1];
String apiName = words[2];
LongWritable recbytes = new LongWritable(Long.parseLong(words[7]));
Text record = new Text();
record.set(new StringBuffer("flow::").append(appid)
.append("::").append(apiName).toString());
reporter.progress();
//輸出流量的統(tǒng)計結(jié)果,通過flow::作為前綴來標示。output.collect(record, recbytes);
record.clear();
record.set(new StringBuffer("count::").append(appid).append("::")
.append(apiName).toString());
//輸出次數(shù)的統(tǒng)計結(jié)果,通過count::作為前綴來標示
output.collect(record, new LongWritable(1));
}
}
LogAnalysiser:: PartitionerClass
public static class PartitionerClass implements Partitioner<Text, LongWritable>
{
public int getPartition(Text key, LongWritable value, int numPartitions)
{
if (numPartitions >= 2)//Reduce 個數(shù),判斷流量還是次數(shù)的統(tǒng)計分配到不同的Reduce
if (key.toString().startsWith("flow::"))
return 0;
else
return 1;
else
return 0;
}
public void configure(JobConf job){}
}
LogAnalysiser:: CombinerClass
參看ReduceClass,通常兩者可以使用一個,不過這里有些不同的處理就分成了兩個。在ReduceClass中藍色的行表示在CombinerClass中不存在。
LogAnalysiser:: ReduceClass
public static class ReduceClass extends MapReduceBase
implements Reducer<Text, LongWritable,Text, LongWritable>
{
public void reduce(Text key, Iterator<LongWritable> values,
OutputCollector<Text, LongWritable> output, Reporter reporter)throws IOException
{
Text newkey = new Text();
newkey.set(key.toString().substring(key.toString().indexOf("::")+2));
LongWritable result = new LongWritable();
long tmp = 0;
int counter = 0;
while(values.hasNext())//累加同一個key的統(tǒng)計結(jié)果
{
tmp = tmp + values.next().get();
counter = counter +1;//擔(dān)心處理太久,JobTracker長時間沒有收到報告會認為TaskTracker已經(jīng)失效,因此定時報告一下
if (counter == 1000)
{
counter = 0;
reporter.progress();
}
}
result.set(tmp);
output.collect(newkey, result);//輸出最后的匯總結(jié)果
}
}
LogAnalysiser
public static void main(String[] args)
{
try
{
run(args);
} catch (Exception e)
{
e.printStackTrace();
}
}
public static void run(String[] args) throws Exception
{
if (args == null || args.length <2)
{
System.out.println("need inputpath and outputpath");
return;
}
String inputpath = args[0];
String outputpath = args[1];
String shortin = args[0];
String shortout = args[1];
if (shortin.indexOf(File.separator) >= 0)
shortin = shortin.substring(shortin.lastIndexOf(File.separator));
if (shortout.indexOf(File.separator) >= 0)
shortout = shortout.substring(shortout.lastIndexOf(File.separator));
SimpleDateFormat formater = new SimpleDateFormat("yyyy.MM.dd");
shortout = new StringBuffer(shortout).append("-")
.append(formater.format(new Date())).toString();
if (!shortin.startsWith("/"))
shortin = "/" + shortin;
if (!shortout.startsWith("/"))
shortout = "/" + shortout;
shortin = "/user/root" + shortin;
shortout = "/user/root" + shortout;
File inputdir = new File(inputpath);
File outputdir = new File(outputpath);
if (!inputdir.exists() || !inputdir.isDirectory())
{
System.out.println("inputpath not exist or isn’t dir!");
return;
}
if (!outputdir.exists())
{
new File(outputpath).mkdirs();
}
JobConf conf = new JobConf(new Configuration(),LogAnalysiser.class);//構(gòu)建Config
FileSystem fileSys = FileSystem.get(conf);
fileSys.copyFromLocalFile(new Path(inputpath), new Path(shortin));//將本地文件系統(tǒng)的文件拷貝到HDFS中
conf.setJobName("analysisjob");
conf.setOutputKeyClass(Text.class);//輸出的key類型,在OutputFormat會檢查
conf.setOutputValueClass(LongWritable.class); //輸出的value類型,在OutputFormat會檢查
conf.setMapperClass(MapClass.class);
conf.setCombinerClass(CombinerClass.class);
conf.setReducerClass(ReduceClass.class);
conf.setPartitionerClass(PartitionerClass.class);
conf.set("mapred.reduce.tasks", "2");//強制需要有兩個Reduce來分別處理流量和次數(shù)的統(tǒng)計
FileInputFormat.setInputPaths(conf, shortin);//hdfs中的輸入路徑
FileOutputFormat.setOutputPath(conf, new Path(shortout));//hdfs中輸出路徑
Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(conf);
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
//刪除輸入和輸出的臨時文件
fileSys.copyToLocalFile(new Path(shortout),new Path(outputpath));
fileSys.delete(new Path(shortin),true);
fileSys.delete(new Path(shortout),true);
}
以上的代碼就完成了所有的邏輯性代碼,然后還需要一個注冊驅(qū)動類來注冊業(yè)務(wù)Class為一個可標示的命令,讓hadoop jar可以執(zhí)行。
public class ExampleDriver {
public static void main(String argv[]){
ProgramDriver pgd = new ProgramDriver();
try {
pgd.addClass("analysislog", LogAnalysiser.class, "A map/reduce program that analysis log .");
pgd.driver(argv);
}
catch(Throwable e){
e.printStackTrace();
}
}
}
將代碼打成jar,并且設(shè)置jar的mainClass為ExampleDriver這個類。在分布式環(huán)境啟動以后執(zhí)行如下語句:
hadoop jar analysiser.jar analysislog /home/wenchu/test-in /home/wenchu/test-out
在/home/wenchu/test-in中是需要分析的日志文件,執(zhí)行后就會看見整個執(zhí)行過程,包括了Map和Reduce的進度。執(zhí)行完畢會 在/home/wenchu/test-out下看到輸出的內(nèi)容。有兩個文件:part-00000和part-00001分別記錄了統(tǒng)計后的結(jié)果。 如果需要看執(zhí)行的具體情況,可以看在輸出目錄下的_logs/history/xxxx_analysisjob,里面羅列了所有的Map,Reduce 的創(chuàng)建情況以及執(zhí)行情況。在運行期也可以通過瀏覽器來查看Map,Reduce的情況:http://MasterIP:50030 /jobtracker.jsp
Hadoop集群測試
首先這里使用上面的范例作為測試,也沒有做太多的優(yōu)化配置,這個測試結(jié)果只是為了看看集群的效果,以及一些參數(shù)配置的影響。
文件復(fù)制數(shù)為1,blocksize 5M
Slave數(shù) |
處理記錄數(shù)(萬條) |
執(zhí)行時間(秒) |
2 |
95 |
38 |
2 |
950 |
337 |
4 |
95 |
24 |
4 |
950 |
178 |
6 |
95 |
21 |
6 |
950 |
114 |
Blocksize 5M
Slave數(shù) |
處理記錄數(shù)(萬條) |
執(zhí)行時間(秒) |
2(文件復(fù)制數(shù)為1) |
950 |
337 |
2(文件復(fù)制數(shù)為3) |
950 |
339 |
6(文件復(fù)制數(shù)為1) |
950 |
114 |
6(文件復(fù)制數(shù)為3) |
950 |
117 |
文件復(fù)制數(shù)為1
Slave數(shù) |
處理記錄數(shù)(萬條) |
執(zhí)行時間(秒) |
6(blocksize 5M) |
95 |
21 |
6(blocksize 77M) |
95 |
26 |
4(blocksize 5M) |
950 |
178 |
4(blocksize 50M) |
950 |
54 |
6(blocksize 5M) |
950 |
114 |
6(blocksize 50M) |
950 |
44 |
6(blocksize 77M) |
950 |
74 |
測試的數(shù)據(jù)結(jié)果很穩(wěn)定,基本測幾次同樣條件下都是一樣。通過測試結(jié)果可以看出以下幾點:
- 機器數(shù)對于性能還是有幫助的(等于沒說^_^)。
- 文件復(fù)制數(shù)的增加只對安全性有幫助,但是對于性能沒有太多幫助。而且現(xiàn)在采取的是將操作系統(tǒng)文件拷貝到HDFS中,所以備份多了,準備的時間很長。
- blocksize對于性能影響很大,首先如果將block劃分的太小,那么將會增加job的數(shù)量,同時也增加了協(xié)作的代價,降低了性能,但是配置的太大也會讓job不能最大化并行處理。所以這個值的配置需要根據(jù)數(shù)據(jù)處理的量來考慮。
- 最后就是除了這個表里面列出來的結(jié)果,應(yīng)該去仔細看輸出目錄中的_logs/history中的xxx_analysisjob這個文件,里面記錄了全部的執(zhí)行過程以及讀寫情況。這個可以更加清楚地了解哪里可能會更加耗時。
隨想
“云計算”熱的燙手,就和SAAS、Web2及SNS等一樣,往往都是在搞概念,只有真正踏踏實實的大型互聯(lián)網(wǎng)公司,才會投入人力物力去研究符合自 己的分布式計算。其實當(dāng)你的數(shù)據(jù)量沒有那么大的時候,這種分布式計算也就僅僅只是一個玩具而已,只有在真正解決問題的過程中,它深層次的問題才會被挖掘出 來。
這三篇文章(分布式計算開源框架Hadoop介紹,Hadoop中的集群配置和使用技巧)僅僅是為了給對分布式計算有興趣的朋友拋個磚,要想真的掘到金 子,那么就踏踏實實的去用、去想、去分析。或者自己也會更進一步地去研究框架中的實現(xiàn)機制,在解決自己問題的同時,也能夠貢獻一些什么。
前幾日看到有人跪求成為架構(gòu)師的方式,看了有些可悲,有些可笑,其實有多少架構(gòu)師知道什么叫做架構(gòu)?架構(gòu)師的職責(zé)是什么?與其追求這么一個名號,還不如踏踏實實地做塊石頭沉到水底。要知道,積累和沉淀的過程就是一種成長。