前置條件
成功安裝配置Hadoop集群
成功編譯安裝Hadoop Eclipse插件
本地有和服務器相同版本的Hadoop安裝包,并已解壓(本例使用hadoop-1.1.2)
啟動Hadoop集群

配置hadoop installation directory
Eclipse選擇WindowàPreferens,彈出Preferens窗口,選擇左側Hadoop Map/Reduce選項,右側配置Hadoop installation directory

配置Map/Reduce Locations
Eclipse選擇WindowàOpen PerspectiveàMap/Reduce,打開Map/Reduce視圖。

點擊上圖右上角藍色小象,彈出Hadoop Location新增配置窗口,如下圖:

Location name:自定義Location名稱
Map/Reduce Master:mapred-site.xml中mapred.job.tracker屬性值
DFS Master:core-site.xml中fs.default.name屬性值
User name:服務器端連接hdfs的用戶名

連接成功,如上圖左側所示,列出了當前HDFS中的目錄。
新建MapReduce Project

以NCDC求年最高氣溫為例,將氣溫數據放在/user/hadoop/ncdc/input目錄下,如圖所示:

創建測試類MaxTempTest,代碼如下:
package com.hadoop.test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MaxTempTest extends Configured implements Tool {
public static class MaxTempMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//從輸入文本中解析出年和氣溫
String line = value.toString();
String year = line.substring(15, 19);
int airTemp = 0;
if (line.length() > 87) {
if (line.charAt(87) == '+') {
airTemp = Integer.parseInt(line.substring(88, 92));
} else {
airTemp = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemp != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemp));
}
} else {
System.out.println("year: " + year + ", " + context.getInputSplit().toString());
context.write(new Text(year), new IntWritable(airTemp));
}
}
}
public static class MaxTempReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
protected void reduce(Text key, java.lang.Iterable<IntWritable> values, Context context) throws IOException ,InterruptedException {
//計算最大值
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
try {
int exitCode = ToolRunner.run(new MaxTempTest(), args);
System.exit(exitCode);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
if (args.length != 2) {
System.out.println("Usage: MaxTempTest <input path> <output path>");
System.exit(-1);
}
Configuration config = new Configuration();
try {
FileSystem hdfs = FileSystem.get(config);
Job job = new Job(config, "Max TempTest");
//設置輸出Key和Value的類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setJarByClass(MaxTempTest.class);
job.setMapperClass(MaxTempMapper.class); //設置Mapper類
job.setReducerClass(MaxTempReducer.class); //設置Reducer類
//設置輸入輸出格式
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path inputDir = new Path(args[0]);
Path outputDir = new Path(args[1]);
//如果輸出目錄已經存在,刪除該目錄
if (hdfs.exists(outputDir)) {
hdfs.delete(outputDir, true);
}
//設置輸入輸出路徑
FileInputFormat.setInputPaths(job, inputDir);
FileOutputFormat.setOutputPath(job, outputDir);
//提交作業
job.waitForCompletion(true);
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return 0;
}
}
運行測試項目
在測試類MaxTempTest右鍵àRun AsàRun Configurations彈出Run Configurations窗口,右鍵Java ApplicationàNew,新建名為MaxTempTest,如下圖所示:

點右側Arguments配置運行參數,在Program arguments中輸入:
hdfs://10.120.10.11:9090/user/hadoop/ncdc/input hdfs://10.120.10.11:9090/user/hadoop/ncdc/output
在測試類MaxTempTest右鍵àRun AsàRun on Hadoop彈出Select Hadoop location窗口,如下圖所示:

選擇“Choose an existing server from the list below”,點擊“Finish”,提交作業。
運行錯誤一

解決方法:
從hadoop-1.1.2\src\core\org\apache\hadoop\fs目錄找到FileUtil.java,將該文件復制到自己的測試項目中,包名與FileUtil中的package定義相同,注釋掉類中checkReturnValue方法中的代碼,如下圖所示:

運行錯誤二

解決方法:
在服務器端修改目錄權限,執行hadoop fs -chmod -R 777 ./ncdc命令

運行結果

如上圖所示,作業運行成功。Eclipse左側“DFS Locations”右鍵刷新”ncdc”目錄,雙擊打開part-r-00000查看運行結果,如下圖所示:
