<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    paulwong

    Implementation for CombineFileInputFormat Hadoop 0.20.205

    運行MAPREDUCE JOB時,如果輸入的文件比較小而多時,默認情況下會生成很多的MAP JOB,即一個文件一個MAP JOB,因此需要優化,使多個文件能合成一個MAP JOB的輸入。

    具體的原理是下述三步:

    1.根據輸入目錄下的每個文件,如果其長度超過mapred.max.split.size,以block為單位分成多個split(一個split是一個map的輸入),每個split的長度都大于mapred.max.split.size, 因為以block為單位, 因此也會大于blockSize, 此文件剩下的長度如果大于mapred.min.split.size.per.node, 則生成一個split, 否則先暫時保留.

    2. 現在剩下的都是一些長度效短的碎片,把每個rack下碎片合并, 只要長度超過mapred.max.split.size就合并成一個split, 最后如果剩下的碎片比mapred.min.split.size.per.rack大, 就合并成一個split, 否則暫時保留.

    3. 把不同rack下的碎片合并, 只要長度超過mapred.max.split.size就合并成一個split, 剩下的碎片無論長度, 合并成一個split.
    舉例: mapred.max.split.size=1000
    mapred.min.split.size.per.node=300
    mapred.min.split.size.per.rack=100
    輸入目錄下五個文件,rack1下三個文件,長度為2050,1499,10, rack2下兩個文件,長度為1010,80. 另外blockSize為500.
    經過第一步, 生成五個split: 1000,1000,1000,499,1000. 剩下的碎片為rack1下:50,10; rack2下10:80
    由于兩個rack下的碎片和都不超過100, 所以經過第二步, split和碎片都沒有變化.
    第三步,合并四個碎片成一個split, 長度為150.

    如果要減少map數量, 可以調大mapred.max.split.size, 否則調小即可.

    其特點是: 一個塊至多作為一個map的輸入,一個文件可能有多個塊,一個文件可能因為塊多分給做為不同map的輸入, 一個map可能處理多個塊,可能處理多個文件。

    注:CombineFileInputFormat是一個抽象類,需要編寫一個繼承類。


    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileSplit;
    import org.apache.hadoop.mapred.InputSplit;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.LineRecordReader;
    import org.apache.hadoop.mapred.RecordReader;
    import org.apache.hadoop.mapred.Reporter;
    import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
    import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
    import org.apache.hadoop.mapred.lib.CombineFileSplit;

    @SuppressWarnings("deprecation")
    public class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> {

        @SuppressWarnings({ "unchecked", "rawtypes" })
        @Override
        public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) throws IOException {

            return new CombineFileRecordReader(conf, (CombineFileSplit) split, reporter, (Class) myCombineFileRecordReader.class);
        }

        public static class myCombineFileRecordReader implements RecordReader<LongWritable, Text> {
            private final LineRecordReader linerecord;

            public myCombineFileRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException {
                FileSplit filesplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index), split.getLocations());
                linerecord = new LineRecordReader(conf, filesplit);
            }

            @Override
            public void close() throws IOException {
                linerecord.close();

            }

            @Override
            public LongWritable createKey() {
                // TODO Auto-generated method stub
                return linerecord.createKey();
            }

            @Override
            public Text createValue() {
                // TODO Auto-generated method stub
                return linerecord.createValue();
            }

            @Override
            public long getPos() throws IOException {
                // TODO Auto-generated method stub
                return linerecord.getPos();
            }

            @Override
            public float getProgress() throws IOException {
                // TODO Auto-generated method stub
                return linerecord.getProgress();
            }

            @Override
            public boolean next(LongWritable key, Text value) throws IOException {

                // TODO Auto-generated method stub
                return linerecord.next(key, value);
            }

        }
    }


    在運行時這樣設置:

    if (argument != null) {
                    conf.set("mapred.max.split.size", argument);
                } else {
                    conf.set("mapred.max.split.size", "134217728"); // 128 MB
                }
    //

                conf.setInputFormat(CombinedInputFormat.class);


    posted on 2013-08-29 16:08 paulwong 閱讀(384) 評論(0)  編輯  收藏 所屬分類: 分布式HADOOP云計算

    主站蜘蛛池模板: 免费无码国产V片在线观看| 亚洲一区二区三区免费视频| 亚洲人成色77777| 猫咪免费人成网站在线观看| 亚洲欧美成人一区二区三区| JLZZJLZZ亚洲乱熟无码| 久草免费手机视频| 亚洲熟妇无码一区二区三区| 亚洲色偷偷偷鲁综合| 国产在线国偷精品产拍免费| 一级毛片免费视频网站| 亚洲理论精品午夜电影| 伊人久久亚洲综合影院| 2021精品国产品免费观看| 亚洲国产精品成人综合色在线| 亚洲尤码不卡AV麻豆| 国产精品成人免费一区二区 | 在线看片韩国免费人成视频| 直接进入免费看黄的网站| 国产亚洲精品a在线无码| 成全高清视频免费观看| 大地影院MV在线观看视频免费| 亚洲中文无码亚洲人成影院| 国产亚洲成av片在线观看| 国产成人免费网站在线观看| 91热久久免费精品99| 一个人看的www免费高清| 国产精品亚洲综合久久| 久久亚洲精品视频| 日本高清免费中文字幕不卡| 91精品成人免费国产片| 精品无码一级毛片免费视频观看 | 在线观看亚洲AV日韩A∨| 亚洲av福利无码无一区二区| 四虎永久免费网站免费观看| 国产人成免费视频网站| 久章草在线精品视频免费观看 | 嫖丰满老熟妇AAAA片免费看| a视频免费在线观看| 一级毛片不卡免费看老司机| 九九精品国产亚洲AV日韩|