#
這項利器是由CRM領域的領導Saleforce發布的。相當于HBase的JDBC。
具體詳見:
https://github.com/forcedotcom/phoenix支持select,from,where,groupby,having,orderby和建表操作,未來將支持二級索引,join操作,動態列簇等功能。
是建立在原生HBASE API基礎上的,響應時間10M級別的數據是毫秒,100M級別是秒。
WireframeSketcher是一個Eclipse 插件,用于創建線框圖,界面模型和UI原型。
項目正式開發前創建原型可以幫助用戶和開發者理解系統,使用WireframeSketcher在Eclipse中創建能夠更好的集成進入你的項目開發流程。
WireframeSketcher 如何工作?它提供了一個pre-drawn,text-driven 預制圖,文本驅動的widgets,能夠展現通用UI界面,你可以拖拽他們進入編輯器迅速畫出你的界面。界面用XML存儲。
- HBASE的SHELL命令使用
- HBASE的JAVA CLIENT的使用
新增和修改記錄用PUT。
PUT的執行流程:
首先會在內存中增加MEMSTORE,如果這個表有N個COLOUMN FAMILY,則會產生N個MEMSTORE,記錄中的值屬于不同的COLOUMN FAMILY的,會保存到不同的MEMSTORE中。MEMSTORE中的值不會馬上FLUSH到文件中,而是到MEMSTORE滿的時候再FLUSH,且FLUSH的時候不會寫入已存在的HFILE中,而是新增一個HFILE去保存。另外會寫WRITE AHEAD LOG,這是由于新增記錄時不是馬上寫入HFILE的,如果中途出現DOWN機時,則HBASE重啟時會根據這個LOG來恢復數據。
刪除記錄用DELETE。
刪除時并不會將在HFILE中的內容刪除,而是作一標記,然后在查詢的時候可以不取這些記錄。
讀取單條記錄用GET。
讀取的時候會將記錄保存到CAHE中,同樣如果這個表有N個COLOUMN FAMILY,則會產生N個CAHE
,記錄中的值屬于不同的COLOUMN FAMILY的,會保存到不同的CAHE中。這樣下次客戶端再取記錄時會綜合CAHE和MEMSTORE來返回數據。
新增表用HADMIN。
查詢多條記錄用SCAN和FILTER。
- HBASE的分布式計算
為什么會有分布式計算
前面的API是針對ONLINE的應用,即要求低延時的,相當于OLTP。而針對大量數據時這些API就不適用了。
如要針對全表數據進行分析時用SCAN,這樣會將全表數據取回本地,如果數據量在100G時會耗幾個小時,為了節省時間,引入多線程做法,但要引入多線程時,需遵從新算法:將全表數據分成N個段,每段用一個線程處理,處理完后,交結果合成,然后進行分析。
如果數據量在200G或以上時間就加倍了,多線程的方式不能滿足了,因此引入多進程方式,即將計算放在不同的物理機上處理,這時就要考慮每個物理機DOWN機時的處理方式等情況了,HADOOP的MAPREDUCE則是這種分布式計算的框架了,對于應用者而言,只須處理分散和聚合的算法,其他的無須考慮。
HBASE的MAPREDUCE
使用TABLEMAP和TABLEREDUCE。
HBASE的部署架構和組成的組件
架構在HADOOP和ZOOPKEEPER之上。
HBASE的查詢記錄和保存記錄的流程
說見前一編博文。
HBASE作為數據來源地、保存地和共享數據源的處理方式
即相當于數據庫中JOIN的算法:REDUCE SIDE JOIN、MAP SIDE JOIN。
@import url(http://www.tkk7.com/CuteSoft_Client/CuteEditor/Load.ashx?type=style&file=SyntaxHighlighter.css);@import url(/css/cuteeditor.css);
Hadoop/Hbase是開源版的google Bigtable, GFS, MapReduce的實現,隨著互聯網的發展,大數據的處理顯得越發重要,Hadoop/Hbase的用武之地也越發廣泛。為了更好的使用Hadoop/Hbase系統,需要有一套完善的監控系統,來了解系統運行的實時狀態,做到一切盡在掌握。Hadoop/Hbase有自己非常完善的metrics framework, 里面包種各種維度的系統指標的統計,另外,這套metrics framework設計的也非常不錯,用戶可以很方便地添加自定義的metrics。更為重要的一點是metrics的展示方式,目前它支持三種方式:一種是落地到本地文件,一種是report給Ganglia系統,另一種是通過JMX來展示。本文主要介紹怎么把Hadoop/Hbase的metrics report給Ganglia系統,通過瀏覽器來查看。
介紹后面的內容之前有必要先簡單介紹一下Ganglia系統。Ganglia是一個開源的用于系統監控的系統,它由三部分組成:gmond, gmetad, webfrontend, 三部分是這樣分工的:
gmond: 是一個守護進程,運行在每一個需要監測的節點上,收集監測統計,發送和接受在同一個組播或單播通道上的統計信息
gmetad: 是一個守護進程,定期檢查gmond,從那里拉取數據,并將他們的指標存儲在RRD存儲引擎中
webfrontend: 安裝在有gmetad運行的機器上,以便讀取RRD文件,用來做前臺展示
簡單總結它們三者的各自的功用,gmond收集數據各個node上的metrics數據,gmetad匯總gmond收集到的數據,webfrontend在前臺展示gmetad匯總的數據。Ganglia缺省是對系統的一些metric進行監控,比如cpu/memory/net等。不過Hadoop/Hbase內部做了對Ganglia的支持,只需要簡單的改配置就可以將Hadoop/Hbase的metrics也接入到ganglia系統中進行監控。
接下來介紹如何把Hadoop/Hbase接入到Ganglia系統,這里的Hadoop/Hbase的版本號是0.94.2,早期的版本可能會有一些不同,請注意區別。Hbase本來是Hadoop下面的子項目,因此所用的metrics framework原本是同一套Hadoop metrics,但后面hadoop有了改進版本的metrics framework:metrics2(metrics version 2), Hadoop下面的項目都已經開始使用metrics2, 而Hbase成了Apache的頂級子項目,和Hadoop成為平行的項目后,目前還沒跟進metrics2,它用的還是原始的metrics.因此這里需要把Hadoop和Hbase的metrics分開介紹。
Hadoop接入Ganglia:
1. Hadoop metrics2對應的配置文件為:hadoop-metrics2.properties
2. hadoop metrics2中引用了source和sink的概念,source是用來收集數據的, sink是用來把source收集的數據consume的(包括落地文件,上報ganglia,JMX等)
3. hadoop metrics2配置支持Ganglia:
#*.sink.ganglia.class=org.apache.hadoop.metrics2.sink.ganglia.GangliaSink30
*.sink.ganglia.class=org.apache.hadoop.metrics2.sink.ganglia.GangliaSink31
*.sink.ganglia.period=10
*.sink.ganglia.supportsparse=true
*.sink.ganglia.slope=jvm.metrics.gcCount=zero,jvm.metrics.memHeapUsedM=both
*.sink.ganglia.dmax=jvm.metrics.threadsBlocked=70,jvm.metrics.memHeapUsedM=40
#uncomment as your needs
namenode.sink.ganglia.servers=10.235.6.156:8649
#datanode.sink.ganglia.servers=10.235.6.156:8649
#jobtracker.sink.ganglia.servers=10.0.3.99:8649
#tasktracker.sink.ganglia.servers=10.0.3.99:8649
#maptask.sink.ganglia.servers=10.0.3.99:8649
#reducetask.sink.ganglia.servers=10.0.3.99:8649
這里需要注意的幾點:
(1) 因為Ganglia3.1與3.0不兼容,需要根據Ganglia的版本選擇使用GangliaSink30或者GangliaSink31
(2) period配置上報周期,單位是秒(s)
(3) namenode.sink.ganglia.servers指定Ganglia gmetad所在的host:port,用來向其上報數據
(4) 如果同一個物理機器上同時啟動了多個hadoop進程(namenode/datanode, etc),根據需要把相應的進程的sink.ganglia.servers配置好即可
Hbase接入Ganglia:
1. Hbase所用的hadoop metrics對應的配置文件是: hadoop-metrics.properties
2. hadoop metrics里核心是Context,寫文件有寫文件的TimeStampingFileContext, 向Ganglia上報有GangliaContext/GangliaContext31
3. hadoop metrics配置支持Ganglia:
# Configuration of the "hbase" context for ganglia
# Pick one: Ganglia 3.0 (former) or Ganglia 3.1 (latter)
# hbase.class=org.apache.hadoop.metrics.ganglia.GangliaContext
hbase.class=org.apache.hadoop.metrics.ganglia.GangliaContext31
hbase.period=10
hbase.servers=10.235.6.156:8649
這里需要注意幾點:
(1) 因為Ganglia3.1和3.0不兼容,所以如果是3.1以前的版本,需要用GangliaContext, 如果是3.1版的Ganglia,需要用GangliaContext31
(2) period的單位是秒(s),通過period可以配置向Ganglia上報數據的周期
(3) servers指定的是Ganglia gmetad所在的host:port,把數據上報到指定的gmetad
(4) 對rpc和jvm相關的指標都可以進行類似的配置
REGIONS SERVER和TASK TRACKER SERVER不要在同一臺機器上,最好如果有MAPREDUCE JOB運行的話,應該分開兩個CLUSTER,即兩群不同的服務器上,這樣MAPREDUCE 的線下負載不會影響到SCANER這些線上負載。
如果主要是做MAPREDUCE JOB的話,將REGIONS SERVER和TASK TRACKER SERVER放在一起是可以的。
原始集群模式
10個或以下節點,無MAPREDUCE JOB,主要用于低延遲的訪問。每個節點上的配置為:CPU4-6CORE,內存24-32G,4個SATA硬盤。Hadoop NameNode, JobTracker, HBase Master, 和ZooKeeper全都在同一個NODE上。
小型集群模式(10-20臺服務器)
HBase Master放在單獨一臺機器上, 以便于使用較低配置的機器。ZooKeeper也放在單獨一臺機器上,NameNode和JobTracker放在同一臺機器上。
中型集群模式(20-50臺服務器)
由于無須再節省費用,可以將HBase Master和ZooKeeper放在同一臺機器上, ZooKeeper和HBase Master要三個實例。NameNode和JobTracker放在同一臺機器上。
大型集群模式(>50臺服務器)
和中型集群模式相似,但ZooKeeper和HBase Master要五個實例。NameNode和Second NameNode要有足夠大的內存。
HADOOP MASTER節點
NameNode和Second NameNode服務器配置要求:(小型)8CORE CPU,16G內存,1G網卡和SATA 硬盤,中弄再增加多16G內存,大型則再增加多32G內存。
HBASE MASTER節點
服務器配置要求:4CORE CPU,8-16G內存,1G網卡和2個SATA 硬盤,一個用于操作系統,另一個用于HBASE MASTER LOGS。
HADOOP DATA NODES和HBASE REGION SERVER節點
DATA NODE和REGION SERVER應在同一臺服務器上,且不應該和TASK TRACKER在一起。服務器配置要求:8-12CORE CPU,24-32G內存,1G網卡和12*1TB SATA 硬盤,一個用于操作系統,另一個用于HBASE MASTER LOGS。
ZOOPKEEPERS節點
服務器配置和HBASE MASTER相似,也可以與HBASE MASTER放在一起,但就要多增加一個硬盤單獨給ZOOPKEEPER使用。
-Xmx8g—設置HEAP的最大值到8G,不建議設到15 GB.
-Xms8g—設置HEAP的最小值到8GS.
-Xmn128m—設置新生代的值到128 MB,默認值太小。
-XX:+UseParNewGC—設置對于新生代的垃圾回收器類型,這種類型是會停止JAVA進程,然后再進行回收的,但由于新生代體積比較小,持續時間通常只有幾毫秒,因此可以接受。
-XX:+UseConcMarkSweepGC—設置老生代的垃圾回收類型,如果用新生代的那個會不合適,即會導致JAVA進程停止的時間太長,用這種不會停止JAVA進程,而是在JAVA進程運行的同時,并行的進行回收。
-XX:CMSInitiatingOccupancyFraction—設置CMS回收器運行的頻率。
GET、PUT是ONLINE的操作,MAPREDUCE是OFFLINE的操作
HDFS寫流程
客戶端收到要保存文件的請求后,將文件以64M為單位拆成若干份BLOCK,形成一個列表,即由幾個BLOCK組成,將這些信息告訴NAME NODE,我要保存這個,NAME NODE算出一個列表,哪段BLOCK應該寫到哪個DATA NODE,客戶端將第一個BLOCK傳到第一個節點DATA NODE A,通知其保存,同時讓它通知DATA NODE D和DATA NODE B也保存一份,DATA NODE D收到信息后進行了保存,同時通知DATA NODE B保存一份,DATA NODE B保存完成后則通知客戶端保存完成,客戶端再去向NAME NODE中取下一個BLOCK要保存的位置,重復以上的動作,直到所有的BLOCK都保存完成。
HDFS讀流程
客戶端向NAME NODE請求讀一個文件,NAME NODE返回這個文件所構成的所有BLOCK的DATA NODE IP及BLOCK ID,客戶端并行的向各DATA NODE發出請求,要取某個BLOCK ID的BLOCK,DATA NODE發回所要的BLOCK給客戶端,客戶端收集到所有的BLOCK后,整合成一個完整的文件后,此流程結束。
MAPREDUCE流程
輸入數據 -- 非多線程了,而是多進程的挑選數據,即將輸入數據分成多塊,每個進程處理一塊 -- 分組 -- 多進程的匯集數據 -- 輸出
HBASE表結構
HBASE中將一個大表數據分成不同的小表,每個小表叫REGION,存放REGION的服務器叫REGIONSERVER,一個REGIONSERVER可以存放多個REGION。通常REGIONSERVER和DATA NODE是在同一服務器,以減少NETWORK IO。
-ROOT-表存放于MASTER SERVER上,記錄了一共有多少個REGIONSERVER,每個REGION SERVER上都有一個.META.表,上面記錄了本REGION SERVER放有哪幾個表的哪幾個REGION。如果要知道某個表共有幾個REGION,就得去所有的REGION SERVER上查.META.表,進行匯總才能得知。
客戶端如果要查ROW009的信息,先去咨詢ZOOPKEEPER,-ROOT-表在哪里,然后問-ROOT-表,哪個.META.知道這個信息,然后去問.META.表,哪個REGION有這個信息,然后去那個REGION問ROW009的信息,然后那個REGION返回此信息。
HBASE MAPREDUCE
一個REGION一個MAP任務,而任務里的map方法執行多少次,則由查詢出來的記錄有多少條,則執行多少次。
REDUCE任務負責向REGION寫數據,但寫到哪個REGION則由那個KEY歸屬哪個REGION管,則寫到哪個REGION,有可能REDUCE任務會和所有的REGION SERVER交互。
在HBASE的MAPREDUCE JOB中使用JOIN
REDUCE-SIDE JOIN
利用現有的SHUTTLE分組機制,在REDUCE階段做JOIN,但由于MAP階段數據大,可能會有性能問題。
MAP-SIDE JOIN
將數據較少的一表讀到一公共文件中,然后在MPA方法中循環另一表的數據,再將要的數據從公共文件中讀取。這樣可以減少SHUTTLE和SORT的時間,同時也不需要REDUCE任務。
1) 在Reduce階段進行Join,這樣運算量比較小.(這個適合被Join的數據比較小的情況下.)
2) 壓縮字段,對數據預處理,過濾不需要的字段.
3) 最后一步就是在Mapper階段過濾,這個就是Bloom Filter的用武之地了.也就是需要詳細說明的地方.
下面就拿一個我們大家都熟悉的場景來說明這個問題: 找出上個月動感地帶的客戶資費的使用情況,包括接入和撥出.
(這個只是我臆想出來的例子,根據實際的DB數據存儲結構,在這個場景下肯定有更好的解決方案,大家不要太較真哦)
這個時候的兩個個數據集都是比較大的,這兩個數據集分別是:上個月的通話記錄,動感地帶的手機號碼列表.
比較直接的處理方法有2種:
1)在 Reduce 階段,通過動感地帶號碼來過濾. 優點:這樣需要處理的數據相對比較少,這個也是比較常用的方法.
缺點:很多數據在Mapper階段花了老鼻子力氣匯總了,還通過網絡Shuffle到Reduce節點,結果到這個階段給過濾了.
2)在 Mapper 階段時,通過動感地帶號碼來過濾數據. 優點:這樣可以過濾很多不是動感地帶的數據,比如神州行,全球通.這些過濾的數據就可以節省很多網絡帶寬了.
缺點:就是動感地帶的號碼不是小數目,如果這樣處理就需要把這個大塊頭復制到所有的Mapper節點,甚至是Distributed Cache.(Bloom Filter就是用來解決這個問題的)
Bloom Filter就是用來解決上面方法2的缺點的.
方法2的缺點就是大量的數據需要在多個節點復制.Bloom Filter通過多個Hash算法, 把這個號碼列表壓縮到了一個Bitmap里面. 通過允許一定的錯誤率來換空間, 這個和我們平時經常提到的時間和空間的互換類似.詳細情況可以參考:
http://blog.csdn.net/jiaomeng/article/details/1495500
但是這個算法也是有缺陷的,就是會把很多神州行,全球通之類的號碼當成動感地帶.但在這個場景中,這根本不是問題.因為這個算法只是過濾一些號碼,漏網之魚會在Reduce階段進行精確匹配時顧慮掉.
這個方法改進之后基本上完全回避了方法2的缺點:
1) 沒有大量的動感地帶號碼發送到所有的Mapper節點.
2) 很多非動感地帶號碼在Mapper階段就過濾了(雖然不是100%),避免了網絡帶寬的開銷及延時.
繼續需要學習的地方:Bitmap的大小, Hash函數的多少, 以及存儲的數據的多少. 這3個變量如何取值才能才能在存儲空間與錯誤率之間取得一個平衡.
NAME NODE起保存DATA NODE上文件的位置信息用,主要有兩個保存文件:FsImage和EditLog,FsImage保存了上一次NAME NODE啟動時的狀態,EditLog則記錄每次成功后的對HDFS的操作行為。當NAME NODE重啟時,會合并FsImage和EditLog成為一個新的FsImage,清空EditLog,如果EditLog非常大的時候,則NAME NODE啟動的時間會非常長。因此就有SECOND NAME NODE。
SECOND NAME NODE會以HTTP的方式向NAME NODE要這兩個文件,當NAME NODE收到請求時,就會韋一個新的EditLog來記錄,這時SECOND NAME NODE就會將取得的這兩個文件合并,成一個新的FsImage,再發給NAME NODE,NAME NODE收到后,就會以這個為準,舊的就會歸檔不用。
SECOND NAME NODE還有一個用途就是當NAME NODE DOWN了的時候,可以改SECOND NAME NODE的IP為NAME NODE所用的IP,當NAME NODE用。
secondary namenoded 配置很容易被忽視,如果jps檢查都正常,大家通常不會太關心,除非namenode發生問題的時候,才會想起還有個secondary namenode,它的配置共兩步:
- 集群配置文件conf/master中添加secondarynamenode的機器
- 修改/添加 hdfs-site.xml中如下屬性:
<property>
<name>dfs.http.address</name>
<value>{your_namenode_ip}:50070</value>
<description>
The address and the base port where the dfs namenode web ui will listen on.
If the port is 0 then the server will start on a free port.
</description>
</property>
這兩項配置OK后,啟動集群。進入secondary namenode 機器,檢查fs.checkpoint.dir(core-site.xml文件,默認為${hadoop.tmp.dir}/dfs/namesecondary)目錄同步狀態是否和namenode一致的。
如果不配置第二項則,secondary namenode同步文件夾永遠為空,這時查看secondary namenode的log顯示錯誤為:
2011-06-09 11:06:41,430 INFO org.apache.hadoop.hdfs.server.common.Storage: Recovering storage directory /tmp/hadoop-hadoop/dfs/namesecondary from failed checkpoint.
2011-06-09 11:06:41,433 ERROR org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode: Exception in doCheckpoint:
2011-06-09 11:06:41,434 ERROR org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode: java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:351)
at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:211)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:200)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:366)
at java.net.Socket.connect(Socket.java:529)
at java.net.Socket.connect(Socket.java:478)
at sun.net.NetworkClient.doConnect(NetworkClient.java:163)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:394)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:529)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:233)
at sun.net.www.http.HttpClient.New(HttpClient.java:306)
at sun.net.www.http.HttpClient.New(HttpClient.java:323)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:970)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:911)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:836)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1172)
at org.apache.hadoop.hdfs.server.namenode.TransferFsImage.getFileClient(TransferFsImage.java:151)
at org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode.downloadCheckpointFiles(SecondaryNameNode.java:256)
at org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode.doCheckpoint(SecondaryNameNode.java:313)
at org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode.run(SecondaryNameNode.java:225)
at java.lang.Thread.run(Thread.java:662)
可能用到的core-site.xml文件相關屬性:
<property>
<name>fs.checkpoint.period</name>
<value>300</value>
<description>The number of seconds between two periodic checkpoints.
</description>
</property>
<property>
<name>fs.checkpoint.dir</name>
<value>${hadoop.tmp.dir}/dfs/namesecondary</value>
<description>Determines where on the local filesystem the DFS secondary
name node should store the temporary images to merge.
If this is a comma-delimited list of directories then the image is
replicated in all of the directories for redundancy.
</description>
</property>
采用Cloudera版本的hadoop/hbase:
hadoop-0.20.2-cdh3u0
hbase-0.90.1-cdh3u0
zookeeper-3.3.3-cdh3u0
默認已支持FairScheduler調度算法.
只需改配置使期用FairSchedule而非默認的JobQueueTaskScheduler即可.
配置fair-scheduler.xml (/$HADOOP_HOME/conf/):
<?xml version="1.0"?>
<property>
<name>mapred.fairscheduler.allocation.file</name>
<value>[HADOOP_HOME]/conf/fair-scheduler.xml</value>
</property>
<allocations>
<pool name="qiji-task-pool">
<minMaps>5</minMaps>
<minReduces>5</minReduces>
<maxRunningJobs>
<maxRunningJobs>5</maxRunningJobs>
<minSharePreemptionTimeout>300</minSharePreemptionTimeout>
<weight>1.0</weight>
</pool>
<user name="ecap">
<maxRunningJobs>
<maxRunningJobs>6</maxRunningJobs>
</user>
<poolMaxJobsDefault>10</poolMaxJobsDefault>
<userMaxJobsDefault>8</userMaxJobsDefault>
<defaultMinSharePreemptionTimeout>600
</defaultMinSharePreemptionTimeout>
<fairSharePreemptionTimeout>600</fairSharePreemptionTimeout>
</allocations>
配置$HADOOP_HOME/conf/mapred-site.xml,最后添加:
<property>
<name>mapred.jobtracker.taskScheduler</name>
<value>org.apache.hadoop.mapred.FairScheduler</value>
</property>
<property>
<name>mapred.fairscheduler.allocation.file</name>
<value>/opt/hadoop/conf/fair-scheduler.xml</value>
</property>
<property>
<name>mapred.fairscheduler.assignmultiple</name>
<value>true</value>
</property>
<property>
<name>mapred.fairscheduler.sizebasedweight</name>
<value>true</value>
</property>
然后重新運行集群,這樣有幾個Job(上面配置是5個并行)并行運行時,不會因為一個Job把Map/Reduce占滿而使其它Job處于Pending狀態.
可從: http://<masterip>:50030/scheduler查看并行運行的狀態.