<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    paulwong

    Implementation for CombineFileInputFormat Hadoop 0.20.205

    運行MAPREDUCE JOB時,如果輸入的文件比較小而多時,默認情況下會生成很多的MAP JOB,即一個文件一個MAP JOB,因此需要優化,使多個文件能合成一個MAP JOB的輸入。

    具體的原理是下述三步:

    1.根據輸入目錄下的每個文件,如果其長度超過mapred.max.split.size,以block為單位分成多個split(一個split是一個map的輸入),每個split的長度都大于mapred.max.split.size, 因為以block為單位, 因此也會大于blockSize, 此文件剩下的長度如果大于mapred.min.split.size.per.node, 則生成一個split, 否則先暫時保留.

    2. 現在剩下的都是一些長度效短的碎片,把每個rack下碎片合并, 只要長度超過mapred.max.split.size就合并成一個split, 最后如果剩下的碎片比mapred.min.split.size.per.rack大, 就合并成一個split, 否則暫時保留.

    3. 把不同rack下的碎片合并, 只要長度超過mapred.max.split.size就合并成一個split, 剩下的碎片無論長度, 合并成一個split.
    舉例: mapred.max.split.size=1000
    mapred.min.split.size.per.node=300
    mapred.min.split.size.per.rack=100
    輸入目錄下五個文件,rack1下三個文件,長度為2050,1499,10, rack2下兩個文件,長度為1010,80. 另外blockSize為500.
    經過第一步, 生成五個split: 1000,1000,1000,499,1000. 剩下的碎片為rack1下:50,10; rack2下10:80
    由于兩個rack下的碎片和都不超過100, 所以經過第二步, split和碎片都沒有變化.
    第三步,合并四個碎片成一個split, 長度為150.

    如果要減少map數量, 可以調大mapred.max.split.size, 否則調小即可.

    其特點是: 一個塊至多作為一個map的輸入,一個文件可能有多個塊,一個文件可能因為塊多分給做為不同map的輸入, 一個map可能處理多個塊,可能處理多個文件。

    注:CombineFileInputFormat是一個抽象類,需要編寫一個繼承類。


    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileSplit;
    import org.apache.hadoop.mapred.InputSplit;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.LineRecordReader;
    import org.apache.hadoop.mapred.RecordReader;
    import org.apache.hadoop.mapred.Reporter;
    import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
    import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
    import org.apache.hadoop.mapred.lib.CombineFileSplit;

    @SuppressWarnings("deprecation")
    public class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> {

        @SuppressWarnings({ "unchecked", "rawtypes" })
        @Override
        public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) throws IOException {

            return new CombineFileRecordReader(conf, (CombineFileSplit) split, reporter, (Class) myCombineFileRecordReader.class);
        }

        public static class myCombineFileRecordReader implements RecordReader<LongWritable, Text> {
            private final LineRecordReader linerecord;

            public myCombineFileRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException {
                FileSplit filesplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index), split.getLocations());
                linerecord = new LineRecordReader(conf, filesplit);
            }

            @Override
            public void close() throws IOException {
                linerecord.close();

            }

            @Override
            public LongWritable createKey() {
                // TODO Auto-generated method stub
                return linerecord.createKey();
            }

            @Override
            public Text createValue() {
                // TODO Auto-generated method stub
                return linerecord.createValue();
            }

            @Override
            public long getPos() throws IOException {
                // TODO Auto-generated method stub
                return linerecord.getPos();
            }

            @Override
            public float getProgress() throws IOException {
                // TODO Auto-generated method stub
                return linerecord.getProgress();
            }

            @Override
            public boolean next(LongWritable key, Text value) throws IOException {

                // TODO Auto-generated method stub
                return linerecord.next(key, value);
            }

        }
    }


    在運行時這樣設置:

    if (argument != null) {
                    conf.set("mapred.max.split.size", argument);
                } else {
                    conf.set("mapred.max.split.size", "134217728"); // 128 MB
                }
    //

                conf.setInputFormat(CombinedInputFormat.class);


    posted on 2013-08-29 16:08 paulwong 閱讀(384) 評論(0)  編輯  收藏 所屬分類: 分布式HADOOP云計算

    主站蜘蛛池模板: 一级毛片在播放免费| A级毛片成人网站免费看| 国产在线19禁免费观看| 男女一进一出抽搐免费视频| 亚洲精品午夜视频| 免费在线观看毛片| 99精品视频免费观看| 男男黄GAY片免费网站WWW| 亚洲成人在线电影| 国产精品另类激情久久久免费| 成人免费区一区二区三区| 国内精品久久久久影院亚洲| 中文字幕亚洲一区二区三区| 最近中文字幕免费mv视频8| 黄色网页在线免费观看| 亚洲一区二区观看播放| 亚洲电影一区二区三区| 亚洲成av人片天堂网老年人| 国产精品入口麻豆免费观看| 亚洲高清免费视频| 亚洲欧美国产欧美色欲| 亚洲熟妇av一区| 亚洲精品无码专区久久久| 国产大片51精品免费观看| 免费在线观看h片| 久久免费视频观看| yellow视频免费看| 亚洲精品国产首次亮相| 亚洲国产午夜精品理论片 | 久久久久久久尹人综合网亚洲| 最近中文字幕免费mv视频7| 57pao国产成永久免费视频| 一本一道dvd在线观看免费视频| 亚洲成a人片在线看| 亚洲高清资源在线观看| 亚洲国产无套无码av电影| 亚洲国产精品一区二区九九 | 一二三四在线播放免费观看中文版视频 | 又黄又大又爽免费视频| 毛片免费视频在线观看| 91精品成人免费国产片|