<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    march alex's blog
    hello,I am march alex
    posts - 52,comments - 7,trackbacks - 0
    MapReduce是一種可用于數(shù)據(jù)處理的變成模型。
    這里主要講述Java語言實現(xiàn)MapReduce。
    一個投票模型:
    Jobs很喜歡給女生打分,好看的打100分,難看的打0分。有一次他給Lucy打了0分,結果被Lucy痛打了一頓。
    還有一次,Jobs給兩個美女打分,給美女Alice打了99分,給美女Candice打了98分。
    這個時候Alex就看不下去了,他于是站起來說:“明明是Candice比較好看嘛!”。
    兩人于是爭執(zhí)起來,為了證明自己的觀點,結果爆發(fā)了一場大戰(zhàn)!什么降龍十八掌啊,黯然銷魂掌啊,他們都不會。
    那么怎么才能讓對方輸?shù)男姆诜??他們想?#8220;群眾的眼睛是雪亮的”!于是他們發(fā)動了班上的20名同學進行投票。
    結果出來了,Alice的平均分是98.5,Candice的平均分是99.7,以壓倒性的優(yōu)勢獲得了勝利。
    但是Jobs不服,于是把班上每個女生的照片放到了網上,讓大家來評分,最后他得到了好多文件,用自己的電腦算根本忙不過來,于是他想到了用Hadoop寫一個MapReduce程序。
    一直輸入文件的格式是:"[user]\t[girlname]\t[point]".例:
    alex    alice   88
    alex    candice 100
    jobs    alice   100
    john    lucy    89

    在這里,我們假設每個人的評分都為0到100的整數(shù),最終的結果向下取整。那么MapReduce程序可以寫成如下:

    我們需要三樣東西:一個map函數(shù),一個reduce函數(shù),和一些用來運行作業(yè)的代碼。
    map函數(shù)由Mapper接口實現(xiàn)來表示,后者聲明了一個map()方法。
    AverageMapper.java

    import java.io.IOException;

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.MapReduceBase;
    import org.apache.hadoop.mapred.Mapper;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.Reporter;

    public class AverageMapper extends MapReduceBase
      implements Mapper<IntWritable, Text, Text, IntWritable> {
            
            public void map(IntWritable key, Text value,
                            OutputCollector<Text, IntWritable> output, Reporter reporter)
                            throws IOException {
                    
                    String s = value.toString();
                    String name = new String();
                    int point = 0;
                    int i;
                    for(i=0;i<s.length() && s.charAt(i)!='\t';i++);
                    for(i++;i<s.length() && s.charAt(i)!='\t';i++) {
                            name += s.charAt(i);
                    }
                    for(i++;i<s.length();i++) {
                            point = point * 10 + (s.charAt(i) - '0');
                    }
                    if(name.length() != 0 && point >=0 && point <= 100) {
                            output.collect(new Text(name), new IntWritable(point));
                    }
            }
    }

    該Mapper接口是一個泛型類型,他有四個形參類型,分別指定map函數(shù)的輸入鍵、輸入值、輸出鍵、輸出值的類型。
    以示例來說,輸入鍵是一個整形偏移量(表示行號),輸入值是一行文本,輸出鍵是美女的姓名,輸出值是美女的得分。
    Hadoop自身提供一套可優(yōu)化網絡序列化傳輸?shù)幕绢愋?,而不直接使用Java內嵌的類型。這些類型均可在org.apache.hadoop.io包中找到。
    這里我們使用IntWritable類型(相當于Java中的Integer類型)和Text類型(想到與Java中的String類型)。
    map()方法還提供了OutputCollector示例用于輸出內容的寫入。
    我們只在輸入內容格式正確的時候才將其寫入輸出記錄中。

    reduce函數(shù)通過Reducer進行類似的定義。
    AverageReducer.java

    import java.io.IOException;
    import java.util.Iterator;

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.MapReduceBase;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.Reducer;
    import org.apache.hadoop.mapred.Reporter;


    public class AverageReducer extends MapReduceBase
      implements Reducer<Text, IntWritable, Text, IntWritable> {
            
            public void reduce(Text key, Iterator<IntWritable> values,
                            OutputCollector<Text, IntWritable> output, Reporter reporter)
                            throws IOException {
                    
                    long tot_point = 0, num = 0;
                    while(values.hasNext()) {
                            tot_point += values.next().get();
                            num ++;
                    }
                    int ave_point = (int)(tot_point/num);
                    output.collect(key, new IntWritable(ave_point));
            }
    }

    第三部分代碼負責運行MapReduce作業(yè)。
    Average.java
    import java.io.IOException;

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.FileOutputFormat;
    import org.apache.hadoop.mapred.JobConf;


    public class Average {
            public static void main(String[] args) throws IOException {
                    if(args.length != 2) {
                            System.err.println("Usage: Average <input path> <output path>");
                            System.exit(-1);
                    }
                    JobConf conf = new JobConf(Average.class);
                    conf.setJobName("Average");
                    FileInputFormat.addInputPath(conf, new Path(args[0]));
                    FileOutputFormat.setOutputPath(conf, new Path(args[1]));
                    conf.setMapperClass(AverageMapper.class);
                    conf.setReducerClass(AverageReducer.class);
                    conf.setOutputKeyClass(Text.class);
                    conf.setOutputValueClass(IntWritable.class);
            }
    }

    JobConf對象指定了作業(yè)執(zhí)行規(guī)范。我們可以用它來控制整個作業(yè)的運行。
    在Hadoop作業(yè)上運行著寫作業(yè)時,我們需要將代碼打包成一個JAR文件(Hadoop會在集群上分發(fā)這些文件)。
    我們無需明確指定JAR文件的名稱,而只需在JobConf的構造函數(shù)中傳遞一個類,Hadoop將通過該類查找JAR文件進而找到相關的JAR文件。
    構造JobCOnf對象之后,需要指定輸入和輸出數(shù)據(jù)的路徑。
    調用FileInputFormat類的靜態(tài)方法addInputPath()來定義輸入數(shù)據(jù)的路徑。
    可以多次調用addInputOath()實現(xiàn)多路徑的輸入。
    調用FileOutputFormat類的靜態(tài)函數(shù)SetOutputPath()來指定輸出路徑。
    該函數(shù)指定了reduce函數(shù)輸出文件的寫入目錄。
    在運行任務前該目錄不應該存在,否則Hadoop會報錯并拒絕運行該任務。
    這種預防措施是為了防止數(shù)據(jù)丟失。
    接著,通過SetMapperClass()和SetReducerClass()指定map和reduce類型。
    輸入類型通過InputFormat類來控制,我們的例子中沒有設置,因為使用的是默認的TextInputFormat(文本輸入格式)。

    新增的Java MapReduce API


    新的Hadoop在版本0.20.0包含有一個新的Java MapReduce API,又是也稱為"上下文對象"(context object),旨在使API在今后更容易擴展。
    新特性:
    傾向于使用虛類,而不是接口;
    新的API放在org.apache.hadoop.mapreduce包中(舊版本org.apache.hadoop.mapred包);
    新的API充分使用上下文對象,使用戶代碼能與MapReduce通信。例如:MapContext基本具備了JobConf,OutputCollector和Reporter的功能;
    新的API同時支持“推”(push)和“拉”(pull)的迭代。這兩類API,均可以將鍵/值對記錄推給mapper,但除此之外,新的API也允許把記錄從map()方法中拉出、對reducer來說是一樣的。拉式處理的好處是可以實現(xiàn)批量處理,而非逐條記錄的處理。
    新的API實現(xiàn)了配置的統(tǒng)一。所有作業(yè)的配置均通過Configuration來完成。(區(qū)別于舊API的JobConf)。
    新API中作業(yè)控制由Job類實現(xiàn),而非JobClient類,新API中刪除了JobClient類。
    輸出文件的命名稍有不同。

    用新上下文對象來重寫Average應用

    import java.io.IOException;

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    public class NewAverage {
            
            static class NewAverageMapper
              extends Mapper<IntWritable, Text, Text, IntWritable> {
                    
                    public void map(IntWritable key, Text value, Context context) 
                            throws IOException, InterruptedException {
                            
                            String s = value.toString();
                            String name = new String();
                            int point = 0;
                            int i;
                            for(i=0;i<s.length() && s.charAt(i)!='\t';i++);
                            for(i++;i<s.length() && s.charAt(i)!='\t';i++) {
                                    name += s.charAt(i);
                            }
                            for(i++;i<s.length();i++) {
                                    point = point * 10 + (s.charAt(i) - '0');
                            }
                            if(name.length() != 0 && point >=0 && point <= 100) {
                                    context.write(new Text(name), new IntWritable(point));
                            }
                    }
            }
            
            static class NewAverageReducer
              extends Reducer<Text, IntWritable, Text, IntWritable> {
                    
                    public void reduce(Text key, Iterable<IntWritable> values,
                                    Context context)
                                    throws IOException, InterruptedException {
                            
                            long tot_point = 0, num = 0;
                            for(IntWritable value : values) {
                                    tot_point += value.get();
                                    num ++;
                            }
                            int ave_point = (int)(tot_point/num);
                            context.write(key, new IntWritable(ave_point));
                    }
            }
            
            public static void main(String[] args) throws Exception {
                    if(args.length != 2) {
                            System.err.println("Usage: NewAverage <input path> <output path>");
                            System.exit(-1);
                    }
                    Job job = new Job();
                    job.setJarByClass(NewAverage.class);
                    
                    FileInputFormat.addInputPath(job, new Path(args[0]));
                    FileOutputFormat.setOutputPath(job, new Path(args[1]));
                    
                    job.setMapperClass(NewAverageMapper.class);
                    job.setReducerClass(NewAverageReducer.class);
                    job.setOutputKeyClass(Text.class);
                    job.setOutputValueClass(IntWritable.class);
                    
                    System.exit(job.waitForCompletion(true) ? 0 : 1);
            }
    }
    posted on 2015-03-08 13:27 marchalex 閱讀(1381) 評論(0)  編輯  收藏 所屬分類: java小程序
    主站蜘蛛池模板: 亚洲日韩亚洲另类激情文学| 国产精品亚洲专一区二区三区| 国产在线观看片a免费观看| 亚洲色少妇熟女11p| 亚洲精品一级无码鲁丝片| 日韩免费视频一区二区| 亚洲色成人网站WWW永久四虎 | 国内精品久久久久影院亚洲| 亚洲片一区二区三区| 99久9在线|免费| 国产亚洲情侣久久精品| 亚洲尹人九九大色香蕉网站| 日韩成人免费aa在线看| 热久久这里是精品6免费观看 | 未满十八18禁止免费无码网站 | 中文字幕免费在线观看| 色综合久久精品亚洲国产| 亚洲天堂一区二区| 亚洲成av人片在线观看天堂无码 | 免费在线观看亚洲| 亚洲国产中文在线二区三区免| 免费a级黄色毛片| 四虎在线成人免费网站| ww在线观视频免费观看w| 天堂亚洲国产中文在线| 亚洲va在线va天堂va四虎| 免费大片黄手机在线观看 | 中文字幕亚洲综合久久男男| 成人免费在线看片| 免费人成黄页在线观看日本| 黄页视频在线观看免费| 亚洲人成77777在线观看网| 亚洲国产天堂在线观看| 亚洲一区二区三区无码中文字幕| 色吊丝最新永久免费观看网站| 免费A级毛片无码视频| 中国人免费观看高清在线观看二区| 亚洲色大成网站www永久男同| 亚洲精品**中文毛片| 久久久亚洲精品无码| 国产偷国产偷亚洲高清日韩|