#
GET、PUT是ONLINE的操作,MAPREDUCE是OFFLINE的操作
HDFS寫流程
客戶端收到要保存文件的請求后,將文件以64M為單位拆成若干份BLOCK,形成一個列表,即由幾個BLOCK組成,將這些信息告訴NAME NODE,我要保存這個,NAME NODE算出一個列表,哪段BLOCK應該寫到哪個DATA NODE,客戶端將第一個BLOCK傳到第一個節點DATA NODE A,通知其保存,同時讓它通知DATA NODE D和DATA NODE B也保存一份,DATA NODE D收到信息后進行了保存,同時通知DATA NODE B保存一份,DATA NODE B保存完成后則通知客戶端保存完成,客戶端再去向NAME NODE中取下一個BLOCK要保存的位置,重復以上的動作,直到所有的BLOCK都保存完成。
HDFS讀流程
客戶端向NAME NODE請求讀一個文件,NAME NODE返回這個文件所構成的所有BLOCK的DATA NODE IP及BLOCK ID,客戶端并行的向各DATA NODE發出請求,要取某個BLOCK ID的BLOCK,DATA NODE發回所要的BLOCK給客戶端,客戶端收集到所有的BLOCK后,整合成一個完整的文件后,此流程結束。
MAPREDUCE流程
輸入數據 -- 非多線程了,而是多進程的挑選數據,即將輸入數據分成多塊,每個進程處理一塊 -- 分組 -- 多進程的匯集數據 -- 輸出
HBASE表結構
HBASE中將一個大表數據分成不同的小表,每個小表叫REGION,存放REGION的服務器叫REGIONSERVER,一個REGIONSERVER可以存放多個REGION。通常REGIONSERVER和DATA NODE是在同一服務器,以減少NETWORK IO。
-ROOT-表存放于MASTER SERVER上,記錄了一共有多少個REGIONSERVER,每個REGION SERVER上都有一個.META.表,上面記錄了本REGION SERVER放有哪幾個表的哪幾個REGION。如果要知道某個表共有幾個REGION,就得去所有的REGION SERVER上查.META.表,進行匯總才能得知。
客戶端如果要查ROW009的信息,先去咨詢ZOOPKEEPER,-ROOT-表在哪里,然后問-ROOT-表,哪個.META.知道這個信息,然后去問.META.表,哪個REGION有這個信息,然后去那個REGION問ROW009的信息,然后那個REGION返回此信息。
HBASE MAPREDUCE
一個REGION一個MAP任務,而任務里的map方法執行多少次,則由查詢出來的記錄有多少條,則執行多少次。
REDUCE任務負責向REGION寫數據,但寫到哪個REGION則由那個KEY歸屬哪個REGION管,則寫到哪個REGION,有可能REDUCE任務會和所有的REGION SERVER交互。
在HBASE的MAPREDUCE JOB中使用JOIN
REDUCE-SIDE JOIN
利用現有的SHUTTLE分組機制,在REDUCE階段做JOIN,但由于MAP階段數據大,可能會有性能問題。
MAP-SIDE JOIN
將數據較少的一表讀到一公共文件中,然后在MPA方法中循環另一表的數據,再將要的數據從公共文件中讀取。這樣可以減少SHUTTLE和SORT的時間,同時也不需要REDUCE任務。
1) 在Reduce階段進行Join,這樣運算量比較小.(這個適合被Join的數據比較小的情況下.)
2) 壓縮字段,對數據預處理,過濾不需要的字段.
3) 最后一步就是在Mapper階段過濾,這個就是Bloom Filter的用武之地了.也就是需要詳細說明的地方.
下面就拿一個我們大家都熟悉的場景來說明這個問題: 找出上個月動感地帶的客戶資費的使用情況,包括接入和撥出.
(這個只是我臆想出來的例子,根據實際的DB數據存儲結構,在這個場景下肯定有更好的解決方案,大家不要太較真哦)
這個時候的兩個個數據集都是比較大的,這兩個數據集分別是:上個月的通話記錄,動感地帶的手機號碼列表.
比較直接的處理方法有2種:
1)在 Reduce 階段,通過動感地帶號碼來過濾. 優點:這樣需要處理的數據相對比較少,這個也是比較常用的方法.
缺點:很多數據在Mapper階段花了老鼻子力氣匯總了,還通過網絡Shuffle到Reduce節點,結果到這個階段給過濾了.
2)在 Mapper 階段時,通過動感地帶號碼來過濾數據. 優點:這樣可以過濾很多不是動感地帶的數據,比如神州行,全球通.這些過濾的數據就可以節省很多網絡帶寬了.
缺點:就是動感地帶的號碼不是小數目,如果這樣處理就需要把這個大塊頭復制到所有的Mapper節點,甚至是Distributed Cache.(Bloom Filter就是用來解決這個問題的)
Bloom Filter就是用來解決上面方法2的缺點的.
方法2的缺點就是大量的數據需要在多個節點復制.Bloom Filter通過多個Hash算法, 把這個號碼列表壓縮到了一個Bitmap里面. 通過允許一定的錯誤率來換空間, 這個和我們平時經常提到的時間和空間的互換類似.詳細情況可以參考:
http://blog.csdn.net/jiaomeng/article/details/1495500
但是這個算法也是有缺陷的,就是會把很多神州行,全球通之類的號碼當成動感地帶.但在這個場景中,這根本不是問題.因為這個算法只是過濾一些號碼,漏網之魚會在Reduce階段進行精確匹配時顧慮掉.
這個方法改進之后基本上完全回避了方法2的缺點:
1) 沒有大量的動感地帶號碼發送到所有的Mapper節點.
2) 很多非動感地帶號碼在Mapper階段就過濾了(雖然不是100%),避免了網絡帶寬的開銷及延時.
繼續需要學習的地方:Bitmap的大小, Hash函數的多少, 以及存儲的數據的多少. 這3個變量如何取值才能才能在存儲空間與錯誤率之間取得一個平衡.
NAME NODE起保存DATA NODE上文件的位置信息用,主要有兩個保存文件:FsImage和EditLog,FsImage保存了上一次NAME NODE啟動時的狀態,EditLog則記錄每次成功后的對HDFS的操作行為。當NAME NODE重啟時,會合并FsImage和EditLog成為一個新的FsImage,清空EditLog,如果EditLog非常大的時候,則NAME NODE啟動的時間會非常長。因此就有SECOND NAME NODE。
SECOND NAME NODE會以HTTP的方式向NAME NODE要這兩個文件,當NAME NODE收到請求時,就會韋一個新的EditLog來記錄,這時SECOND NAME NODE就會將取得的這兩個文件合并,成一個新的FsImage,再發給NAME NODE,NAME NODE收到后,就會以這個為準,舊的就會歸檔不用。
SECOND NAME NODE還有一個用途就是當NAME NODE DOWN了的時候,可以改SECOND NAME NODE的IP為NAME NODE所用的IP,當NAME NODE用。
secondary namenoded 配置很容易被忽視,如果jps檢查都正常,大家通常不會太關心,除非namenode發生問題的時候,才會想起還有個secondary namenode,它的配置共兩步:
- 集群配置文件conf/master中添加secondarynamenode的機器
- 修改/添加 hdfs-site.xml中如下屬性:
<property>
<name>dfs.http.address</name>
<value>{your_namenode_ip}:50070</value>
<description>
The address and the base port where the dfs namenode web ui will listen on.
If the port is 0 then the server will start on a free port.
</description>
</property>
這兩項配置OK后,啟動集群。進入secondary namenode 機器,檢查fs.checkpoint.dir(core-site.xml文件,默認為${hadoop.tmp.dir}/dfs/namesecondary)目錄同步狀態是否和namenode一致的。
如果不配置第二項則,secondary namenode同步文件夾永遠為空,這時查看secondary namenode的log顯示錯誤為:
2011-06-09 11:06:41,430 INFO org.apache.hadoop.hdfs.server.common.Storage: Recovering storage directory /tmp/hadoop-hadoop/dfs/namesecondary from failed checkpoint.
2011-06-09 11:06:41,433 ERROR org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode: Exception in doCheckpoint:
2011-06-09 11:06:41,434 ERROR org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode: java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:351)
at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:211)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:200)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:366)
at java.net.Socket.connect(Socket.java:529)
at java.net.Socket.connect(Socket.java:478)
at sun.net.NetworkClient.doConnect(NetworkClient.java:163)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:394)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:529)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:233)
at sun.net.www.http.HttpClient.New(HttpClient.java:306)
at sun.net.www.http.HttpClient.New(HttpClient.java:323)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:970)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:911)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:836)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1172)
at org.apache.hadoop.hdfs.server.namenode.TransferFsImage.getFileClient(TransferFsImage.java:151)
at org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode.downloadCheckpointFiles(SecondaryNameNode.java:256)
at org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode.doCheckpoint(SecondaryNameNode.java:313)
at org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode.run(SecondaryNameNode.java:225)
at java.lang.Thread.run(Thread.java:662)
可能用到的core-site.xml文件相關屬性:
<property>
<name>fs.checkpoint.period</name>
<value>300</value>
<description>The number of seconds between two periodic checkpoints.
</description>
</property>
<property>
<name>fs.checkpoint.dir</name>
<value>${hadoop.tmp.dir}/dfs/namesecondary</value>
<description>Determines where on the local filesystem the DFS secondary
name node should store the temporary images to merge.
If this is a comma-delimited list of directories then the image is
replicated in all of the directories for redundancy.
</description>
</property>
采用Cloudera版本的hadoop/hbase:
hadoop-0.20.2-cdh3u0
hbase-0.90.1-cdh3u0
zookeeper-3.3.3-cdh3u0
默認已支持FairScheduler調度算法.
只需改配置使期用FairSchedule而非默認的JobQueueTaskScheduler即可.
配置fair-scheduler.xml (/$HADOOP_HOME/conf/):
<?xml version="1.0"?>
<property>
<name>mapred.fairscheduler.allocation.file</name>
<value>[HADOOP_HOME]/conf/fair-scheduler.xml</value>
</property>
<allocations>
<pool name="qiji-task-pool">
<minMaps>5</minMaps>
<minReduces>5</minReduces>
<maxRunningJobs>
<maxRunningJobs>5</maxRunningJobs>
<minSharePreemptionTimeout>300</minSharePreemptionTimeout>
<weight>1.0</weight>
</pool>
<user name="ecap">
<maxRunningJobs>
<maxRunningJobs>6</maxRunningJobs>
</user>
<poolMaxJobsDefault>10</poolMaxJobsDefault>
<userMaxJobsDefault>8</userMaxJobsDefault>
<defaultMinSharePreemptionTimeout>600
</defaultMinSharePreemptionTimeout>
<fairSharePreemptionTimeout>600</fairSharePreemptionTimeout>
</allocations>
配置$HADOOP_HOME/conf/mapred-site.xml,最后添加:
<property>
<name>mapred.jobtracker.taskScheduler</name>
<value>org.apache.hadoop.mapred.FairScheduler</value>
</property>
<property>
<name>mapred.fairscheduler.allocation.file</name>
<value>/opt/hadoop/conf/fair-scheduler.xml</value>
</property>
<property>
<name>mapred.fairscheduler.assignmultiple</name>
<value>true</value>
</property>
<property>
<name>mapred.fairscheduler.sizebasedweight</name>
<value>true</value>
</property>
然后重新運行集群,這樣有幾個Job(上面配置是5個并行)并行運行時,不會因為一個Job把Map/Reduce占滿而使其它Job處于Pending狀態.
可從: http://<masterip>:50030/scheduler查看并行運行的狀態.
挺有意思的題目。
1. 給你A,B兩個文件,各存放50億條URL,每條URL占用64字節,內存限制是4G,讓你找出:A,B文件共同的URL。 解法一:Hash成內存大小的小塊文件,然后分塊內存內查交集。
解法二:Bloom Filter(廣泛應用于URL過濾、查重。參考http://en.wikipedia.org/wiki/Bloom_filter、http://blog.csdn.net/jiaomeng/archive/2007/01/28/1496329.aspx)
2. 有10個文件,每個文件1G, 每個文件的每一行都存放的是用戶的query,每個文件的query都可能重復。要你按照query的頻度排序。解法一:根據數據稀疏程度算法會有不同,通用方法是用Hash把文件重排,讓相同query一定會在同一個文件,同時進行計數,然后歸并,用最小堆來統計頻度最大的。
解法二:類似1,但是用的是與簡單Bloom Filter稍有不同的CBF(Counting Bloom Filter)或者更進一步的SBF(Spectral Bloom Filter,參考http://blog.csdn.net/jiaomeng/archive/2007/03/19/1534238.aspx)
解法三:MapReduce,幾分鐘可以在hadoop集群上搞定。參考http://en.wikipedia.org/wiki/MapReduce
3. 有一個1G大小的一個文件,里面每一行是一個詞,詞的大小不超過16個字節,內存限制大小是1M。返回頻數最高的100個詞。解法一:跟2類似,只是不需要排序,各個文件分別統計前100,然后一起找前100。
摘取了一部分,全文請查看
http://blog.sina.com.cn/s/blog_633f4ab20100r9nm.html
背景
“這是最好的時代,也是最壞的時代。”
每個時代的人都在這么形容自己所處的時代。在一次次IT浪潮下面,有人覺得當下乏味無聊,有人卻能銳意進取,找到突破。數據存儲這個話題自從有了計算機之后,就一直是一個有趣或者無聊的主題。上世紀七十年代,關系數據庫理論的出現,造就了一批又一批傳奇,并推動整個世界信息化到了一個新的高度。而進入新千年以來,隨著SNS等應用的出現,傳統的SQL數據庫已經越來越不適應海量數據的處理了。于是,這幾年NoSQL數據庫的呼聲也越來越高。
在NoSQL數據庫當中,呼聲最高的是HBase和Cassandra兩個。雖然嚴格意義上來說,兩者服務的目的有所不同,側重點也不盡相同,但是作為當前開源NoSQL數據庫的佼佼者,兩者經常被用來做各種比較。
去年十月,Facebook推出了他的新的Message系統。Facebook宣布他們采用HBase作為后臺存儲系統。這引起了一片喧嘩聲。因為Cassandra恰恰是Facebook開發,并且于2008年開源。這讓很多人驚呼,是否是Cassandra已經被Facebook放棄了?HBase在這場NoSQL數據庫的角力當中取得了決定性的勝利?本文打算主要從技術角度分析,HBase和Cassandra的異同,并非要給出任何結論,只是共享自己研究的一些結果。
選手簡介
HBase
HBase是一個開源的分布式存儲系統。他可以看作是Google的Bigtable的開源實現。如同Google的Bigtable使用Google File System一樣,HBase構建于和Google File System類似的Hadoop HDFS之上。
Cassandra
Cassandra可以看作是Amazon Dynamo的開源實現。和Dynamo不同之處在于,Cassandra結合了Google Bigtable的ColumnFamily的數據模型。可以簡單地認為,Cassandra是一個P2P的,高可靠性并具有豐富的數據模型的分布式文件系統。
分布式文件系統的指標
根據UC Berkeley的教授Eric Brewer于2000年提出猜測- CAP定理,一個分布式計算機系統,不可能同時滿足以下三個指標:
Consistency 所有節點在同一時刻保持同一狀態Availability 某個節點失敗,不會影響系統的正常運行Partition tolerance 系統可以因為網絡故障等原因被分裂成小的子系統,而不影響系統的運行
Brewer教授推測,任何一個系統,同時只能滿足以上兩個指標。
在2002年,MIT的Seth Gilbert和Nancy Lynch發表正式論文論證了CAP定理。
而HBase和Cassandra兩者都屬于分布式計算機系統。但是其設計的側重點則有所不同。HBase繼承于Bigtable的設計,側重于CA。而Cassandra則繼承于Dynamo的設計,側重于AP。
。。。。。。。。。。。。。。。。。。。
特性比較
由于HBase和Cassandra的數據模型比較接近,所以這里就不再比較兩者之間數據模型的異同了。接下來主要比較雙方在數據一致性、多拷貝復制的特性。
HBase
HBase保證寫入的一致性。當一份數據被要求復制N份的時候,只有N份數據都被真正復制到N臺服務器上之后,客戶端才會成功返回。如果在復制過程中出現失敗,所有的復制都將失敗。連接上任何一臺服務器的客戶端都無法看到被復制的數據。HBase提供行鎖,但是不提供多行鎖和事務。HBase基于HDFS,因此數據的多份復制功能和可靠性將由HDFS提供。HBase和MapReduce天然集成。
Cassandra
寫入的時候,有多種模式可以選擇。當一份數據模式被要求復制N份的時候,可以立即返回,可以成功復制到一個服務器之后返回,可以等到全部復制到N份服務器之后返回,還可以設定一個復制到quorum份服務器之后返回。Quorum后面會有具體解釋。復制不會失敗。最終所有節點數據都將被寫入。而在未被完全寫入的時間間隙,連接到不同服務器的客戶端有可能讀到不同的數據。在集群里面,所有的服務器都是等價的。不存在任何一個單點故障。節點和節點之間通過Gossip協議互相通信。寫入順序按照timestamp排序,不提供行鎖。新版本的Cassandra已經集成了MapReduce了。
相對于配置Cassandra,配置HBase是一個艱辛、復雜充滿陷阱的工作。Facebook關于為何采取HBase,里面有一句,大意是,Facebook長期以來一直關注HBase的開發并且有一只專門的經驗豐富的HBase維護的team來負責HBase的安裝和維護。可以想象,Facebook內部關于使用HBase和Cassandra有過激烈的斗爭,最終人數更多的HBase team占據了上風。對于大公司來說,養一只相對龐大的類似DBA的team來維護HBase不算什么大的開銷,但是對于小公司,這實在不是一個可以負擔的起的開銷。
另外HBase在高可靠性上有一個很大的缺陷,就是HBase依賴HDFS。HDFS是Google File System的復制品,NameNode是HDFS的單點故障點。而到目前為止,HDFS還沒有加入NameNode的自我恢復功能。不過我相信,Facebook在內部一定有恢復NameNode的手段,只是沒有開源出來而已。
相反,Cassandra的P2P和去中心化設計,沒有可能出現單點故障。從設計上來看,Cassandra比HBase更加可靠。
關于數據一致性,實際上,Cassandra也可以以犧牲響應時間的代價來獲得和HBase一樣的一致性。而且,通過對Quorum的合適的設置,可以在響應時間和數據一致性得到一個很好的折衷值。
Cassandra優缺點
主要表現在:
配置簡單,不需要多模塊協同操作。功能靈活性強,數據一致性和性能之間,可以根據應用不同而做不同的設置。 可靠性更強,沒有單點故障。
盡管如此,Cassandra就沒有弱點嗎?當然不是,Cassandra有一個致命的弱點。
這就是存儲大文件。雖然說,Cassandra的設計初衷就不是存儲大文件,但是Amazon的S3實際上就是基于Dynamo構建的,總是會讓人想入非非地讓Cassandra去存儲超大文件。而和Cassandra不同,HBase基于HDFS,HDFS的設計初衷就是存儲超大規模文件并且提供最大吞吐量和最可靠的可訪問性。因此,從這一點來說,Cassandra由于背后不是一個類似HDFS的超大文件存儲的文件系統,對于存儲那種巨大的(幾百T甚至P)的超大文件目前是無能為力的。而且就算由Client手工去分割,這實際上是非常不明智和消耗Client CPU的工作的。
因此,如果我們要構建一個類似Google的搜索引擎,最少,HDFS是我們所必不可少的。雖然目前HDFS的NameNode還是一個單點故障點,但是相應的Hack可以讓NameNode變得更皮實。基于HDFS的HBase相應地,也更適合做搜索引擎的背后倒排索引數據庫。事實上,Lucene和HBase的結合,遠比Lucene結合Cassandra的項目Lucandra要順暢和高效的多。(Lucandra要求Cassandra使用OrderPreservingPartitioner,這將可能導致Key的分布不均勻,而無法做負載均衡,產生訪問熱點機器)。
所以我的結論是,在這個需求多樣化的年代,沒有贏者通吃的事情。而且我也越來越不相信在工程界存在一勞永逸和一成不變的解決方案。當你僅僅是存儲海量增長的消息數據,存儲海量增長的圖片,小視頻的時候,你要求數據不能丟失,你要求人工維護盡可能少,你要求能迅速通過添加機器擴充存儲,那么毫無疑問,Cassandra現在是占據上風的。
但是如果你希望構建一個超大規模的搜索引擎,產生超大規模的倒排索引文件(當然是邏輯上的文件,真實文件實際上被切分存儲于不同的節點上),那么目前HDFS+HBase是你的首選。
就讓這個看起來永遠正確的結論結尾吧,上帝的歸上帝,凱撒的歸凱撒。大家都有自己的地盤,野百合也會有春天的!
http://www.jdon.com/38244最近因為項目原因,研究了Cassandra,Hbase等幾個NoSQL數據庫,最終決定采用HBase。在這里,我就向大家分享一下自己對HBase的理解。
在說HBase之前,我想再嘮叨幾句。做互聯網應用的哥們兒應該都清楚,互聯網應用這東西,你沒辦法預測你的系統什么時候會被多少人訪問,你面臨的用戶到底有多少,說不定今天你的用戶還少,明天系統用戶就變多了,結果您的系統應付不過來了了,不干了,這豈不是咱哥幾個的悲哀,說時髦點就叫“杯具啊”。
其實說白了,這些就是事先沒有認清楚互聯網應用什么才是最重要的。從系統架構的角度來說,互聯網應用更加看重系統性能以及伸縮性,而傳統企業級應用都是比較看重數據完整性和數據安全性。那么我們就來說說互聯網應用伸縮性這事兒.對于伸縮性這事兒,哥們兒我也寫了幾篇博文,想看的兄弟可以參考我以前的博文,對于web server,app server的伸縮性,我在這里先不說了,因為這部分的伸縮性相對來說比較容易一點,我主要來回顧一些一個慢慢變大的互聯網應用如何應對數據庫這一層的伸縮。
首先剛開始,人不多,壓力也不大,搞一臺數據庫服務器就搞定了,此時所有的東東都塞進一個Server里,包括web server,app server,db server,但是隨著人越來越多,系統壓力越來越多,這個時候可能你把web server,app server和db server分離了,好歹這樣可以應付一陣子,但是隨著用戶量的不斷增加,你會發現,數據庫這哥們不行了,速度老慢了,有時候還會宕掉,所以這個時候,你得給數據庫這哥們找幾個伴,這個時候Master-Salve就出現了,這個時候有一個Master Server專門負責接收寫操作,另外的幾個Salve Server專門進行讀取,這樣Master這哥們終于不抱怨了,總算讀寫分離了,壓力總算輕點了,這個時候其實主要是對讀取操作進行了水平擴張,通過增加多個Salve來克服查詢時CPU瓶頸。一般這樣下來,你的系統可以應付一定的壓力,但是隨著用戶數量的增多,壓力的不斷增加,你會發現Master server這哥們的寫壓力還是變的太大,沒辦法,這個時候怎么辦呢?你就得切分啊,俗話說“只有切分了,才會有伸縮性嘛”,所以啊,這個時候只能分庫了,這也是我們常說的數據庫“垂直切分”,比如將一些不關聯的數據存放到不同的庫中,分開部署,這樣終于可以帶走一部分的讀取和寫入壓力了,Master又可以輕松一點了,但是隨著數據的不斷增多,你的數據庫表中的數據又變的非常的大,這樣查詢效率非常低,這個時候就需要進行“水平分區”了,比如通過將User表中的數據按照10W來劃分,這樣每張表不會超過10W了。
綜上所述,一般一個流行的web站點都會經歷一個從單臺DB,到主從復制,到垂直分區再到水平分區的痛苦的過程。其實數據庫切分這事兒,看起來原理貌似很簡單,如果真正做起來,我想凡是sharding過數據庫的哥們兒都深受其苦啊。對于數據庫伸縮的文章,哥們兒可以看看后面的參考資料介紹。
好了,從上面的那一堆廢話中,我們也發現數據庫存儲水平擴張scale out是多么痛苦的一件事情,不過幸好技術在進步,業界的其它弟兄也在努力,09年這一年出現了非常多的NoSQL數據庫,更準確的應該說是No relation數據庫,這些數據庫多數都會對非結構化的數據提供透明的水平擴張能力,大大減輕了哥們兒設計時候的壓力。下面我就拿Hbase這分布式列存儲系統來說說。
一 Hbase是個啥東東?
在說Hase是個啥家伙之前,首先我們來看看兩個概念,面向行存儲和面向列存儲。面向行存儲,我相信大伙兒應該都清楚,我們熟悉的RDBMS就是此種類型的,面向行存儲的數據庫主要適合于事務性要求嚴格場合,或者說面向行存儲的存儲系統適合OLTP,但是根據CAP理論,傳統的RDBMS,為了實現強一致性,通過嚴格的ACID事務來進行同步,這就造成了系統的可用性和伸縮性方面大大折扣,而目前的很多NoSQL產品,包括Hbase,它們都是一種最終一致性的系統,它們為了高的可用性犧牲了一部分的一致性。好像,我上面說了面向列存儲,那么到底什么是面向列存儲呢?Hbase,Casandra,Bigtable都屬于面向列存儲的分布式存儲系統。看到這里,如果您不明白Hbase是個啥東東,不要緊,我再總結一下下:
Hbase是一個面向列存儲的分布式存儲系統,它的優點在于可以實現高性能的并發讀寫操作,同時Hbase還會對數據進行透明的切分,這樣就使得存儲本身具有了水平伸縮性。
二 Hbase數據模型
HBase,Cassandra的數據模型非常類似,他們的思想都是來源于Google的Bigtable,因此這三者的數據模型非常類似,唯一不同的就是Cassandra具有Super cloumn family的概念,而Hbase目前我沒發現。好了,廢話少說,我們來看看Hbase的數據模型到底是個啥東東。
在Hbase里面有以下兩個主要的概念,Row key,Column Family,我們首先來看看Column family,Column family中文又名“列族”,Column family是在系統啟動之前預先定義好的,每一個Column Family都可以根據“限定符”有多個column.下面我們來舉個例子就會非常的清晰了。
假如系統中有一個User表,如果按照傳統的RDBMS的話,User表中的列是固定的,比如schema 定義了name,age,sex等屬性,User的屬性是不能動態增加的。但是如果采用列存儲系統,比如Hbase,那么我們可以定義User表,然后定義info 列族,User的數據可以分為:info:name = zhangsan,info:age=30,info:sex=male等,如果后來你又想增加另外的屬性,這樣很方便只需要info:newProperty就可以了。
也許前面的這個例子還不夠清晰,我們再舉個例子來解釋一下,熟悉SNS的朋友,應該都知道有好友Feed,一般設計Feed,我們都是按照“某人在某時做了標題為某某的事情”,但是同時一般我們也會預留一下關鍵字,比如有時候feed也許需要url,feed需要image屬性等,這樣來說,feed本身的屬性是不確定的,因此如果采用傳統的關系數據庫將非常麻煩,況且關系數據庫會造成一些為null的單元浪費,而列存儲就不會出現這個問題,在Hbase里,如果每一個column 單元沒有值,那么是占用空間的。下面我們通過兩張圖來形象的表示這種關系:
上圖是傳統的RDBMS設計的Feed表,我們可以看出feed有多少列是固定的,不能增加,并且為null的列浪費了空間。但是我們再看看下圖,下圖為Hbase,Cassandra,Bigtable的數據模型圖,從下圖可以看出,Feed表的列可以動態的增加,并且為空的列是不存儲的,這就大大節約了空間,關鍵是Feed這東西隨著系統的運行,各種各樣的Feed會出現,我們事先沒辦法預測有多少種Feed,那么我們也就沒有辦法確定Feed表有多少列,因此Hbase,Cassandra,Bigtable的基于列存儲的數據模型就非常適合此場景。說到這里,采用Hbase的這種方式,還有一個非常重要的好處就是Feed會自動切分,當Feed表中的數據超過某一個閥值以后,Hbase會自動為我們切分數據,這樣的話,查詢就具有了伸縮性,而再加上Hbase的弱事務性的特性,對Hbase的寫入操作也將變得非常快。
上面說了Column family,那么我之前說的Row key是啥東東,其實你可以理解row key為RDBMS中的某一個行的主鍵,但是因為Hbase不支持條件查詢以及Order by等查詢,因此Row key的設計就要根據你系統的查詢需求來設計了額。我還拿剛才那個Feed的列子來說,我們一般是查詢某個人最新的一些Feed,因此我們Feed的Row key可以有以下三個部分構成<userId><timestamp><feedId>,這樣以來當我們要查詢某個人的最進的Feed就可以指定Start Rowkey為<userId><0><0>,End Rowkey為<userId><Long.MAX_VALUE><Long.MAX_VALUE>來查詢了,同時因為Hbase中的記錄是按照rowkey來排序的,這樣就使得查詢變得非常快。
三 Hbase的優缺點
1 列的可以動態增加,并且列為空就不存儲數據,節省存儲空間.
2 Hbase自動切分數據,使得數據存儲自動具有水平scalability.
3 Hbase可以提供高并發讀寫操作的支持
Hbase的缺點:
1 不能支持條件查詢,只支持按照Row key來查詢.
2 暫時不能支持Master server的故障切換,當Master宕機后,整個存儲系統就會掛掉.
關于數據庫伸縮性的一點資料:
http://www.jurriaanpersyn.com/archives/2009/02/12/database-sharding-at-netlog-with-mysql-and-php/http://adam.blog.heroku.com/past/2009/7/6/sql_databases_dont_scale/
- 將INPUT通過SPLIT成M個MAP任務
- JOB TRACKER將這M個任務分派給TASK TRACKER執行
- TASK TRACKER執行完MAP任務后,會在本地生成文件,然后通知JOB TRACKER
- JOB TRACKER收到通知后,將此任務標記為已完成,如果收到失敗的消息,會將此任務重置為原始狀態,再分派給另一TASK TRACKER執行
- 當所有的MAP任務完成后,JOB TRACKER將MAP執行后生成的LIST重新整理,整合相同的KEY,根據KEY的數量生成R個REDUCE任務,再分派給TASK TRACKER執行
- TASK TRACKER執行完REDUCE任務后,會在HDFS生成文件,然后通知JOB TRACKER
- JOB TRACKER等到所有的REDUCE任務執行完后,進行合并,產生最后結果,通知CLIENT
- TASK TRACKER執行完MAP任務時,可以重新生成新的KEY VALUE對,從而影響REDUCE個數
- 假設遠程HADOOP主機名為ubuntu,則應在hosts文件中加上192.168.58.130 ubuntu
- 新建MAVEN項目,加上相應的配置
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cloudputing</groupId>
<artifactId>bigdata</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<name>bigdata</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-hadoop</artifactId>
<version>0.9.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.94.1</version>
</dependency>
<!-- <dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.90.2</version>
</dependency> -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>3.0.5.RELEASE</version>
</dependency>
</dependencies>
</project>
-
hbase-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://ubuntu:9000/hbase</value>
</property>
<!-- 在構造JOB時,會新建一文件夾來準備所需文件。
如果這一段沒寫,則默認本地環境為LINUX,將用LINUX命令去實施,在WINDOWS環境下會出錯 -->
<property>
<name>mapred.job.tracker</name>
<value>ubuntu:9001</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<!-- 此處會向ZOOKEEPER咨詢JOB TRACKER的可用IP -->
<property>
<name>hbase.zookeeper.quorum</name>
<value>ubuntu</value>
</property>
<property skipInDoc="true">
<name>hbase.defaults.for.version</name>
<value>0.94.1</value>
</property>
</configuration>
- 測試文件:MapreduceTest.java
package com.cloudputing.mapreduce;
import java.io.IOException;
import junit.framework.TestCase;
public class MapreduceTest extends TestCase{
public void testReadJob() throws IOException, InterruptedException, ClassNotFoundException
{
MapreduceRead.read();
}
}
-
MapreduceRead.java
package com.cloudputing.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
public class MapreduceRead {
public static void read() throws IOException, InterruptedException, ClassNotFoundException
{
// Add these statements. XXX
// File jarFile = EJob.createTempJar("target/classes");
// EJob.addClasspath("D:/PAUL/WORK/WORK-SPACES/TEST1/cloudputing/src/main/resources");
// ClassLoader classLoader = EJob.getClassLoader();
// Thread.currentThread().setContextClassLoader(classLoader);
Configuration config = HBaseConfiguration.create();
addTmpJar("file:/D:/PAUL/WORK/WORK-SPACES/TEST1/cloudputing/target/bigdata-1.0.jar",config);
Job job = new Job(config, "ExampleRead");
// And add this statement. XXX
// ((JobConf) job.getConfiguration()).setJar(jarFile.toString());
// TableMapReduceUtil.addDependencyJars(job);
// TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
// MapreduceRead.class,MyMapper.class);
job.setJarByClass(MapreduceRead.class); // class that contains mapper
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(
"wiki", // input HBase table name
scan, // Scan instance to control CF and attribute selection
MapreduceRead.MyMapper.class, // mapper
null, // mapper output key
null, // mapper output value
job);
job.setOutputFormatClass(NullOutputFormat.class); // because we aren't emitting anything from mapper
// DistributedCache.addFileToClassPath(new Path("hdfs://node.tracker1:9000/user/root/lib/stat-analysis-mapred-1.0-SNAPSHOT.jar"),job.getConfiguration());
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
}
/**
* 為Mapreduce添加第三方jar包
*
* @param jarPath
* 舉例:D:/Java/new_java_workspace/scm/lib/guava-r08.jar
* @param conf
* @throws IOException
*/
public static void addTmpJar(String jarPath, Configuration conf) throws IOException {
System.setProperty("path.separator", ":");
FileSystem fs = FileSystem.getLocal(conf);
String newJarPath = new Path(jarPath).makeQualified(fs).toString();
String tmpjars = conf.get("tmpjars");
if (tmpjars == null || tmpjars.length() == 0) {
conf.set("tmpjars", newJarPath);
} else {
conf.set("tmpjars", tmpjars + ":" + newJarPath);
}
}
public static class MyMapper extends TableMapper<Text, Text> {
public void map(ImmutableBytesWritable row, Result value,
Context context) throws InterruptedException, IOException {
String val1 = getValue(value.getValue(Bytes.toBytes("text"), Bytes.toBytes("qual1")));
String val2 = getValue(value.getValue(Bytes.toBytes("text"), Bytes.toBytes("qual2")));
System.out.println(val1 + " -- " + val2);
}
private String getValue(byte [] value)
{
return value == null? "null" : new String(value);
}
}
}
界面:

算法:

說明:
http://lilyproject.org/books/daisy_docs_book--2_3/publications/html-chunked/output/s182.html
注意:
此處的ACL可以是一個系統多個的,如某些情況用不同的ACL。
資源:可以指文檔ID,頁面ID之類的,由于文檔可能很多個,因此用表達式代替之。
角色:指ROLE/USER之類的。
動作(PERMISSION):指操作類型,如讀、寫、刪除等。
結果(ACTION):指GRANT、DENNY等。
具體實現方式:根據表達式進行運算,看哪個表達式為TRUE,則用哪個,再傳入PERMISSION 類型,角色,看ACTION是GRANT還是DENNY,如果是GRANT則授權通過,DENNY則授權不通過。