《Hadoop in action》由Manning出版,磕磕絆絆總算是看完了。書(shū)的內(nèi)容就不做介紹,主要講一下實(shí)踐的過(guò)程。并且在實(shí)踐過(guò)程中參考的書(shū)籍的部分也會(huì)簡(jiǎn)單介紹。
灰色背景部分為一些介紹,或過(guò)程中出現(xiàn)問(wèn)題的描述,可以直接忽略。
由于公司的業(yè)務(wù)需要,要在網(wǎng)絡(luò)收集網(wǎng)頁(yè)之后對(duì)網(wǎng)頁(yè)進(jìn)行結(jié)構(gòu)化的解析,這個(gè)結(jié)構(gòu)化過(guò)程希望能夠基于HDFS并且使用MR算法實(shí)現(xiàn)。
我虛擬了一個(gè)需求,針對(duì)http://hadoop.apache.org/common/releases.html 頁(yè)面,假設(shè)已經(jīng)下載了頁(yè)面并入庫(kù)。要求最終體現(xiàn)的數(shù)據(jù)是 “版本號(hào)+完整鏈接(即a標(biāo)簽全部?jī)?nèi)容)” 的結(jié)構(gòu)。
偽分布式環(huán)境搭建在虛擬機(jī)上,操作系統(tǒng)是centos5.5,hadoop的版本是1.0.0.
插入書(shū)中的一些環(huán)境搭建的介紹
書(shū)中的2.1節(jié)中介紹了每個(gè)進(jìn)程的作用。2.2節(jié)中的ssh設(shè)置也比較重要,否則好像會(huì)一直提示你輸入密碼。2.3.2節(jié)介紹了偽分布式的配置方式,對(duì)core-site.xml,mapred-site.xml,hdfs-site.xml進(jìn)行配置之后,需要對(duì)namenode節(jié)點(diǎn)進(jìn)行格式化。
bin/hadoop namenode –format
hadoop的根目錄為/usr/local/hadoop-1.0.0,直接啟動(dòng)start-all.sh
[root@localhost hadoop-1.0.0]# ./bin/start-all.sh
啟動(dòng)成功后使用jps命令
jdk小工具jps介紹
jps(Java Virtual Machine Process Status Tool)是JDK 1.5提供的一個(gè)顯示當(dāng)前所有java進(jìn)程pid的命令,簡(jiǎn)單實(shí)用,非常適合在linux/unix平臺(tái)上簡(jiǎn)單察看當(dāng)前java進(jìn)程的一些簡(jiǎn)單情況。 jps存放在JAVA_HOME/bin/jps
[root@localhost hadoop-1.0.0]# jps
5694 SecondaryNameNode
5461 NameNode
5578 DataNode
6027 Jps
5784 JobTracker
5905 TaskTracker
這幾個(gè)進(jìn)程是非常重要的。很多時(shí)候出現(xiàn)意外就是因?yàn)槟稠?xiàng)服務(wù)未啟動(dòng)或異常。可以看到上面的命令上打印出日志位置。出現(xiàn)異常后可以在日志中查看詳細(xì)的堆棧信息。
至此,hadoop已經(jīng)啟動(dòng),環(huán)境已經(jīng)準(zhǔn)備就緒。
下面準(zhǔn)備我們的測(cè)試數(shù)據(jù),將目標(biāo)頁(yè)面的html保存為news.txt,偽分布式也同樣支持hdfs,所以我們使用 fs –put 將news.txt存入hdfs中。
[root@localhost hadoop-1.0.0]# ./bin/hadoop fs -put /mnt/hgfs/shared/news.txt /user/root
[root@localhost hadoop-1.0.0]# ./bin/hadoop fs -lsr /userdrwxr-xr-x - root supergroup 0 2012-04-01 11:22 /user/root
-rw-r--r-- 1 root supergroup 3935 2012-04-01 11:22 /user/root/news.txt
實(shí)現(xiàn)的代碼在eclipse中使用maven打包,上傳至虛擬機(jī)。
文件名com.suntang.analyse.hadoop-0.0.1.jar。
使用hadoop的中的jar命令調(diào)用該jar文件。
[root@localhost hadoop-1.0.0]# ./bin/hadoop jar com.suntang.analyse.hadoop-0.0.1.jar com.suntang.analyse.hadoop.AnalyseJob /user/root/news.txt output_root_test
12/04/01 14:40:04 INFO input.FileInputFormat: Total input paths to process : 1
12/04/01 14:40:05 INFO mapred.JobClient: Running job: job_201204011420_0001
12/04/01 14:40:06 INFO mapred.JobClient: map 0% reduce 0%
12/04/01 14:40:19 INFO mapred.JobClient: map 100% reduce 0%
12/04/01 14:40:31 INFO mapred.JobClient: map 100% reduce 100%
12/04/01 14:40:37 INFO mapred.JobClient: Job complete: job_201204011420_0001
…
此處注意我犯的一個(gè)錯(cuò)誤:
[root@localhost hadoop-1.0.0]# ./bin/hadoop jar com.suntang.analyse.hadoop-0.0.1.jar AnalyseJob -libjars hadoop-core-1.0.0.jar /user/root/news.txt output_root_test
Exception in thread "main" java.lang.ClassNotFoundException: AnalyseJob
提示找不到類,因?yàn)槲彝藢?xiě)完整類名,命令應(yīng)該改為
./bin/hadoop jar com.suntang.analyse.hadoop-0.0.1.jar com.suntang.analyse.hadoop.AnalyseJob -libjars hadoop-core-1.0.0.jar /user/root/news.txt output_root_test 即可。
此處運(yùn)行可能出現(xiàn)另外一個(gè)錯(cuò)誤。在命令行中出現(xiàn)
12/04/01 14:01:38 INFO mapred.JobClient: Task Id : attempt_201204011356_0001_m_000001_0, Status : FAILED
java.lang.Throwable: Child Error
at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:271)
Caused by: java.io.IOException: Creation of symlink from /mnt/hgfs/shared/hadoop-1.0.0/libexec/../logs/userlogs/job_201204011356_0001/attempt_201204011356_0001_m_000001_0 to 。。。
就不打全了,重點(diǎn)在與
Creation of symlink,看詳細(xì)日志中hadoop-root-tasktracker-localhost.localdomain.log中提示org.apache.hadoop.fs.FileUtil: Command 'ln -s ....': Operation not supported,即ln操作不支持。google可知這個(gè)是由于vm中的共享區(qū)域的問(wèn)題,解決方法就是將hadoop完全轉(zhuǎn)移至linux目錄中。本例中從/mnt/hgfs/shared/hadoop-1.0.0轉(zhuǎn)移至/usr/local/hadoop-1.0.0。
執(zhí)行完成后可在hdfs中查看結(jié)果,查看目錄結(jié)構(gòu)為
-rw-r--r-- 1 root supergroup 0 2012-04-01 14:40 /user/root/output_root_test/_SUCCESS
drwxr-xr-x - root supergroup 0 2012-04-01 14:40 /user/root/output_root_test/_logs
drwxr-xr-x - root supergroup 0 2012-04-01 14:40 /user/root/output_root_test/_logs/history
-rw-r--r-- 1 root supergroup 13634 2012-04-01 14:40 /user/root/output_root_test/_logs/history/job_201204011420_0001_1333262405103_root_ccAnalyseJob
-rw-r--r-- 1 root supergroup 20478 2012-04-01 14:40 /user/root/output_root_test/_logs/history/job_201204011420_0001_conf.xml
-rw-r--r-- 1 root supergroup 3580 2012-04-01 14:40 /user/root/output_root_test/part-r-00000
/user/root/output_root_test/part-r-00000即為最終結(jié)果文件。
=======================================================================
附加AnalyseJob代碼
package com.suntang.analyse.hadoop;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class AnalyseJob extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
// super.map(key, value, context);
if (value.toString().matches("<a[^>]*>.*?release.*?</a>"))
context.write(
new Text(value.toString().substring(
value.toString().indexOf("release") + 8,
value.toString().indexOf("available") - 1)),
value);
}
}
public static class ReduceClass extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text arg0, Iterable<Text> arg1, Context arg2)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
// super.reduce(arg0, arg1, arg2);
arg2.write(arg0, arg1.iterator().next());
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = new Job(conf, "myAnalyseJob");
job.setJarByClass(getClass());
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(MapClass.class);
job.setReducerClass(ReduceClass.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new AnalyseJob(), args);
System.exit(res);
}
}