<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    paulwong

    Implementation for CombineFileInputFormat Hadoop 0.20.205

    運行MAPREDUCE JOB時,如果輸入的文件比較小而多時,默認(rèn)情況下會生成很多的MAP JOB,即一個文件一個MAP JOB,因此需要優(yōu)化,使多個文件能合成一個MAP JOB的輸入。

    具體的原理是下述三步:

    1.根據(jù)輸入目錄下的每個文件,如果其長度超過mapred.max.split.size,以block為單位分成多個split(一個split是一個map的輸入),每個split的長度都大于mapred.max.split.size, 因為以block為單位, 因此也會大于blockSize, 此文件剩下的長度如果大于mapred.min.split.size.per.node, 則生成一個split, 否則先暫時保留.

    2. 現(xiàn)在剩下的都是一些長度效短的碎片,把每個rack下碎片合并, 只要長度超過mapred.max.split.size就合并成一個split, 最后如果剩下的碎片比mapred.min.split.size.per.rack大, 就合并成一個split, 否則暫時保留.

    3. 把不同rack下的碎片合并, 只要長度超過mapred.max.split.size就合并成一個split, 剩下的碎片無論長度, 合并成一個split.
    舉例: mapred.max.split.size=1000
    mapred.min.split.size.per.node=300
    mapred.min.split.size.per.rack=100
    輸入目錄下五個文件,rack1下三個文件,長度為2050,1499,10, rack2下兩個文件,長度為1010,80. 另外blockSize為500.
    經(jīng)過第一步, 生成五個split: 1000,1000,1000,499,1000. 剩下的碎片為rack1下:50,10; rack2下10:80
    由于兩個rack下的碎片和都不超過100, 所以經(jīng)過第二步, split和碎片都沒有變化.
    第三步,合并四個碎片成一個split, 長度為150.

    如果要減少map數(shù)量, 可以調(diào)大mapred.max.split.size, 否則調(diào)小即可.

    其特點是: 一個塊至多作為一個map的輸入,一個文件可能有多個塊,一個文件可能因為塊多分給做為不同map的輸入, 一個map可能處理多個塊,可能處理多個文件。

    注:CombineFileInputFormat是一個抽象類,需要編寫一個繼承類。


    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileSplit;
    import org.apache.hadoop.mapred.InputSplit;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.LineRecordReader;
    import org.apache.hadoop.mapred.RecordReader;
    import org.apache.hadoop.mapred.Reporter;
    import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
    import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
    import org.apache.hadoop.mapred.lib.CombineFileSplit;

    @SuppressWarnings("deprecation")
    public class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> {

        @SuppressWarnings({ "unchecked", "rawtypes" })
        @Override
        public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) throws IOException {

            return new CombineFileRecordReader(conf, (CombineFileSplit) split, reporter, (Class) myCombineFileRecordReader.class);
        }

        public static class myCombineFileRecordReader implements RecordReader<LongWritable, Text> {
            private final LineRecordReader linerecord;

            public myCombineFileRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException {
                FileSplit filesplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index), split.getLocations());
                linerecord = new LineRecordReader(conf, filesplit);
            }

            @Override
            public void close() throws IOException {
                linerecord.close();

            }

            @Override
            public LongWritable createKey() {
                // TODO Auto-generated method stub
                return linerecord.createKey();
            }

            @Override
            public Text createValue() {
                // TODO Auto-generated method stub
                return linerecord.createValue();
            }

            @Override
            public long getPos() throws IOException {
                // TODO Auto-generated method stub
                return linerecord.getPos();
            }

            @Override
            public float getProgress() throws IOException {
                // TODO Auto-generated method stub
                return linerecord.getProgress();
            }

            @Override
            public boolean next(LongWritable key, Text value) throws IOException {

                // TODO Auto-generated method stub
                return linerecord.next(key, value);
            }

        }
    }


    在運行時這樣設(shè)置:

    if (argument != null) {
                    conf.set("mapred.max.split.size", argument);
                } else {
                    conf.set("mapred.max.split.size", "134217728"); // 128 MB
                }
    //

                conf.setInputFormat(CombinedInputFormat.class);


    posted on 2013-08-29 16:08 paulwong 閱讀(390) 評論(0)  編輯  收藏 所屬分類: 分布式HADOOP云計算

    主站蜘蛛池模板: 好久久免费视频高清| 97在线视频免费公开视频| 亚洲精品午夜国产VA久久成人 | 久久亚洲精品专区蓝色区| 久久精品国产亚洲AV电影| 亚洲国产精品久久66| 亚洲色图在线观看| 久久久久亚洲av无码专区喷水| 久久久久亚洲av无码专区蜜芽| 久久精品国产亚洲香蕉| 亚洲an天堂an在线观看| 久久久久亚洲AV成人片| 亚洲天堂一区二区三区四区| 亚洲AV无码一区二区三区人| 亚洲综合色视频在线观看| 亚洲欧洲国产成人综合在线观看 | 成年美女黄网站18禁免费| 成年人网站在线免费观看| 国产精品色午夜视频免费看| 又粗又硬免费毛片| 自拍偷自拍亚洲精品被多人伦好爽 | 美女视频黄频a免费大全视频| 无码日韩人妻AV一区免费l| 亚洲一区二区三区深夜天堂| 亚洲色大18成人网站WWW在线播放 亚洲色大成WWW亚洲女子 | 免费黄色电影在线观看| 1000部拍拍拍18勿入免费视频下载 | 中文字幕在线观看免费| 女bbbbxxxx另类亚洲| yellow免费网站| 97人妻精品全国免费视频| 足恋玩丝袜脚视频免费网站| 成人午夜视频免费| 国产成人亚洲综合无码| 亚洲天堂男人天堂| 亚洲精品国产高清在线观看| yellow免费网站| 国产电影午夜成年免费视频| 日本高清高色视频免费| 日韩视频在线精品视频免费观看| 91麻豆国产免费观看|