MapReduce是一種可用于數(shù)據(jù)處理的變成模型。
這里主要講述Java語言實現(xiàn)MapReduce。
一個投票模型:
Jobs很喜歡給女生打分,好看的打100分,難看的打0分。有一次他給Lucy打了0分,結果被Lucy痛打了一頓。
還有一次,Jobs給兩個美女打分,給美女Alice打了99分,給美女Candice打了98分。
這個時候Alex就看不下去了,他于是站起來說:“明明是Candice比較好看嘛!”。
兩人于是爭執(zhí)起來,為了證明自己的觀點,結果爆發(fā)了一場大戰(zhàn)!什么降龍十八掌啊,黯然銷魂掌啊,他們都不會。
那么怎么才能讓對方輸?shù)男姆诜??他們想?#8220;群眾的眼睛是雪亮的”!于是他們發(fā)動了班上的20名同學進行投票。
結果出來了,Alice的平均分是98.5,Candice的平均分是99.7,以壓倒性的優(yōu)勢獲得了勝利。
但是Jobs不服,于是把班上每個女生的照片放到了網上,讓大家來評分,最后他得到了好多文件,用自己的電腦算根本忙不過來,于是他想到了用Hadoop寫一個MapReduce程序。
一直輸入文件的格式是:"[user]\t[girlname]\t[point]".例:
alex alice 88
alex candice 100
jobs alice 100
john lucy 89
在這里,我們假設每個人的評分都為0到100的整數(shù),最終的結果向下取整。那么MapReduce程序可以寫成如下:
我們需要三樣東西:一個map函數(shù),一個reduce函數(shù),和一些用來運行作業(yè)的代碼。
map函數(shù)由Mapper接口實現(xiàn)來表示,后者聲明了一個map()方法。
AverageMapper.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class AverageMapper extends MapReduceBase
implements Mapper<IntWritable, Text, Text, IntWritable> {
public void map(IntWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String s = value.toString();
String name = new String();
int point = 0;
int i;
for(i=0;i<s.length() && s.charAt(i)!='\t';i++);
for(i++;i<s.length() && s.charAt(i)!='\t';i++) {
name += s.charAt(i);
}
for(i++;i<s.length();i++) {
point = point * 10 + (s.charAt(i) - '0');
}
if(name.length() != 0 && point >=0 && point <= 100) {
output.collect(new Text(name), new IntWritable(point));
}
}
}
該Mapper接口是一個泛型類型,他有四個形參類型,分別指定map函數(shù)的輸入鍵、輸入值、輸出鍵、輸出值的類型。
以示例來說,輸入鍵是一個整形偏移量(表示行號),輸入值是一行文本,輸出鍵是美女的姓名,輸出值是美女的得分。
Hadoop自身提供一套可優(yōu)化網絡序列化傳輸?shù)幕绢愋?,而不直接使用Java內嵌的類型。這些類型均可在org.apache.hadoop.io包中找到。
這里我們使用IntWritable類型(相當于Java中的Integer類型)和Text類型(想到與Java中的String類型)。
map()方法還提供了OutputCollector示例用于輸出內容的寫入。
我們只在輸入內容格式正確的時候才將其寫入輸出記錄中。
reduce函數(shù)通過Reducer進行類似的定義。
AverageReducer.java
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class AverageReducer extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
long tot_point = 0, num = 0;
while(values.hasNext()) {
tot_point += values.next().get();
num ++;
}
int ave_point = (int)(tot_point/num);
output.collect(key, new IntWritable(ave_point));
}
}
第三部分代碼負責運行MapReduce作業(yè)。
Average.java
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
public class Average {
public static void main(String[] args) throws IOException {
if(args.length != 2) {
System.err.println("Usage: Average <input path> <output path>");
System.exit(-1);
}
JobConf conf = new JobConf(Average.class);
conf.setJobName("Average");
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(AverageMapper.class);
conf.setReducerClass(AverageReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
}
}
JobConf對象指定了作業(yè)執(zhí)行規(guī)范。我們可以用它來控制整個作業(yè)的運行。
在Hadoop作業(yè)上運行著寫作業(yè)時,我們需要將代碼打包成一個JAR文件(Hadoop會在集群上分發(fā)這些文件)。
我們無需明確指定JAR文件的名稱,而只需在JobConf的構造函數(shù)中傳遞一個類,Hadoop將通過該類查找JAR文件進而找到相關的JAR文件。
構造JobCOnf對象之后,需要指定輸入和輸出數(shù)據(jù)的路徑。
調用FileInputFormat類的靜態(tài)方法addInputPath()來定義輸入數(shù)據(jù)的路徑。
可以多次調用addInputOath()實現(xiàn)多路徑的輸入。
調用FileOutputFormat類的靜態(tài)函數(shù)SetOutputPath()來指定輸出路徑。
該函數(shù)指定了reduce函數(shù)輸出文件的寫入目錄。
在運行任務前該目錄不應該存在,否則Hadoop會報錯并拒絕運行該任務。
這種預防措施是為了防止數(shù)據(jù)丟失。
接著,通過SetMapperClass()和SetReducerClass()指定map和reduce類型。
輸入類型通過InputFormat類來控制,我們的例子中沒有設置,因為使用的是默認的TextInputFormat(文本輸入格式)。
新增的Java MapReduce API
新的Hadoop在版本0.20.0包含有一個新的Java MapReduce API,又是也稱為"上下文對象"(context object),旨在使API在今后更容易擴展。
新特性:
傾向于使用虛類,而不是接口;
新的API放在org.apache.hadoop.mapreduce包中(舊版本org.apache.hadoop.mapred包);
新的API充分使用上下文對象,使用戶代碼能與MapReduce通信。例如:MapContext基本具備了JobConf,OutputCollector和Reporter的功能;
新的API同時支持“推”(push)和“拉”(pull)的迭代。這兩類API,均可以將鍵/值對記錄推給mapper,但除此之外,新的API也允許把記錄從map()方法中拉出、對reducer來說是一樣的。拉式處理的好處是可以實現(xiàn)批量處理,而非逐條記錄的處理。
新的API實現(xiàn)了配置的統(tǒng)一。所有作業(yè)的配置均通過Configuration來完成。(區(qū)別于舊API的JobConf)。
新API中作業(yè)控制由Job類實現(xiàn),而非JobClient類,新API中刪除了JobClient類。
輸出文件的命名稍有不同。
用新上下文對象來重寫Average應用
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class NewAverage {
static class NewAverageMapper
extends Mapper<IntWritable, Text, Text, IntWritable> {
public void map(IntWritable key, Text value, Context context)
throws IOException, InterruptedException {
String s = value.toString();
String name = new String();
int point = 0;
int i;
for(i=0;i<s.length() && s.charAt(i)!='\t';i++);
for(i++;i<s.length() && s.charAt(i)!='\t';i++) {
name += s.charAt(i);
}
for(i++;i<s.length();i++) {
point = point * 10 + (s.charAt(i) - '0');
}
if(name.length() != 0 && point >=0 && point <= 100) {
context.write(new Text(name), new IntWritable(point));
}
}
}
static class NewAverageReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
long tot_point = 0, num = 0;
for(IntWritable value : values) {
tot_point += value.get();
num ++;
}
int ave_point = (int)(tot_point/num);
context.write(key, new IntWritable(ave_point));
}
}
public static void main(String[] args) throws Exception {
if(args.length != 2) {
System.err.println("Usage: NewAverage <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(NewAverage.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(NewAverageMapper.class);
job.setReducerClass(NewAverageReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
posted on 2015-03-08 13:27
marchalex 閱讀(1381)
評論(0) 編輯 收藏 所屬分類:
java小程序