http://jeffxie.blog.51cto.com/1365360/305538
我在
Hadoop的用戶郵件列表中看到一些國內的用 戶在訊問一些關于如何操作的
HBase的
問題,還看到了HBase中沒有Example。覺得有 必要跟大家
分享自己的經驗。
在下面的例子中我們分析
Apache的log并把這些log進行分析并把分析完 的結果按用戶IP為ROW,把log中用戶的訪問時間,請求方法,用戶請求的協議,用戶的瀏覽器,服務狀態等寫到HBase的表中。
首先我們要在HBase中建立我們的一個表來存儲
數據。
- public static void creatTable(String table) throws IOException{
- HConnection conn = HConnectionManager.getConnection(conf);
- HBaseAdmin admin = new HBaseAdmin(conf);
- if(!admin.tableExists(new Text(table))){
- System.out.println("1. " + table + " table creating ... please wait");
- HTableDescriptor tableDesc = new HTableDescriptor(table);
- tableDesc.addFamily(new HColumnDescriptor("http:"));
- tableDesc.addFamily(new HColumnDescriptor("url:"));
- tableDesc.addFamily(new HColumnDescriptor("referrer:"));
- admin.createTable(tableDesc);
- } else {
- System.out.println("1. " + table + " table already exists.");
- }
- System.out.println("2. access_log files fetching using map/reduce");
- }
復制代碼 然后我們
運行一個MapReduce任務來取得log中的每一行 數據。因為我們只要取得數據而不需要對結果進行規約,我們只要編寫一個Map
程序即可。
- public static class MapClass extends MapReduceBase implements
- Mapper<WritableComparable, Text, Text, Writable> {
-
- @Override
- public void configure(JobConf job) {
- tableName = job.get(TABLE, "");
- }
-
- public void map(WritableComparable key, Text value,
- OutputCollector<Text, Writable> output, Reporter reporter)
- throws IOException {
- try {
- AccessLogParser log = new AccessLogParser(value.toString());
- if(table==null)
- table = new HTable(conf, new Text(tableName));
- long lockId = table.startUpdate(new Text(log.getIp()));
- table.put(lockId, new Text("http:protocol"), log.getProtocol().getBytes());
- table.put(lockId, new Text("http:method"), log.getMethod().getBytes());
- table.put(lockId, new Text("http:code"), log.getCode().getBytes());
- table.put(lockId, new Text("http:bytesize"), log.getByteSize().getBytes());
- table.put(lockId, new Text("http:agent"), log.getAgent().getBytes());
- table.put(lockId, new Text("url:" + log.getUrl()), log.getReferrer().getBytes());
- table.put(lockId, new Text("referrer:" + log.getReferrer()), log.getUrl().getBytes());
-
- table.commit(lockId, log.getTimestamp());
- } catch (ParseException e) {
- e.printStackTrace();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
復制代碼 我們在Map程序中對于傳進來的每一行先交給AccessLogParser去處理在AccessLogParser德構造器中用一個正則表達式"([^ ]*) ([^ ]*) ([^ ]*) \\[([^]]*)\\] \"([^\"]*)\" " ([^ ]*) ([^ ]*) \"([^\"]*)\" \"([^\"]*)\".*"來匹配每一行的log。接下來我們把這些AccessLogParser處理出來的結果更新到HBase的表中去,好的, 我們的程序寫完了。我們要啟動一個MapReduce的話我們要對工作進行配置。
- public static void runMapReduce(String table,String dir) throws IOException{
- Path tempDir = new Path("log/temp");
- Path InputDir = new Path(dir);
- FileSystem fs = FileSystem.get(conf);
- JobConf jobConf = new JobConf(conf, LogFetcher.class);
- jobConf.setJobName("apache log fetcher");
- jobConf.set(TABLE, table);
- Path[] in = fs.listPaths(InputDir);
- if (fs.isFile(InputDir)) {
- jobConf.setInputPath(InputDir);
- } else {
- for (int i = 0; i < in.length; i++) {
- if (fs.isFile(in[i])) {
- jobConf.addInputPath(in[i]);
- } else {
- Path[] sub = fs.listPaths(in[i]);
- for (int j = 0; j < sub.length; j++) {
- if (fs.isFile(sub[j])) {
- jobConf.addInputPath(sub[j]);
- }
- }
- }
- }
- }
- jobConf.setOutputPath(tempDir);
- jobConf.setMapperClass(MapClass.class);
-
- JobClient client = new JobClient(jobConf);
- ClusterStatus cluster = client.getClusterStatus();
- jobConf.setNumMapTasks(cluster.getMapTasks());
- jobConf.setNumReduceTasks(0);
-
- JobClient.runJob(jobConf);
- fs.delete(tempDir);
- fs.close();
- }
復制代碼 在上面的代碼中我們先產生一個jobConf對象,然后設定我們的InputPath和OutputPath,告訴MapReduce我們的Map類,設 定我們用多少個Map任務和Reduce任務,然后我們不任務提交給JobClient,關于MapReduce跟詳細的
資料在
Hadoop Wiki上。
下載:源碼和已編譯好的
jar文件
example-src.tgz 例子的運行命令是:
bin/hadoop jar examples.jar logfetcher <access_log file or directory> <table_name>
如何運行上面的應用程序呢?我們假定解壓縮完Hadoop分發包的目錄為%HADOOP%
拷貝%HADOOP%\contrib\hbase\bin下的文件到%HADOOP%\bin下,拷貝%HADOOP%\contrib\hbase \conf的文件到%HADOOP%\conf下,拷貝%HADOOP%\src\contrib\hbase\lib的文件到%HADOOP%\lib 下,拷貝%HADOOP%\src\contrib\hbase\hadoop-*-hbase.jar的文件到%HADOOP%\lib下.然后編輯配 置文件hbase-site.xml設定你的hbase.master例子:192.168.2.92:60000。把這些文件分發到運行Hadoop的 機器上去。在regionservers文件添加上這些已分發過的地址。運行bin/start-hbase.sh命令啟動HBase,把你的 apache log文件拷貝到HDFS的apache-log目錄下,等啟動完成后運行下面的命令。
bin/hadoop jar examples.jar logfetcher apache-log apache
訪問
http://localhost:50030/能 看到你的MapReduce任務的運行情況,訪問
http://localhost:60010/能 看到HBase的運行情況。
等任務MapReduce完成后訪問
http://localhost:60010/hql.jsp,在Query輸入框中輸入 SELECT * FROM apache limit=50;。將會看到已經插入表中的數據。

posted on 2013-02-22 14:12
SIMONE 閱讀(2465)
評論(0) 編輯 收藏 所屬分類:
hbase