<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    posts - 495,comments - 227,trackbacks - 0
    http://www.rigongyizu.com/hadoop-job-optimize-combinefileinputformat/

    某日,接手了同事寫的從Hadoop集群拷貝數據到另外一個集群的程序,該程序是運行在Hadoop集群上的job。這個job只有map階段,讀取hdfs目錄下數據的數據,然后寫入到另外一個集群。

    顯然,這個程序沒有考慮大數據量的情況,如果輸入目錄下文件很多或數據量很大,就會導致map數很多。而實際上我們需要拷貝的一個數據源就有近 6T,job啟動起來有1w多個map,一下子整個queue的資源就占滿了。雖然通過調整一些參數可以控制map數(也就是并發數),但是無法準確的控 制map數,而且換個數據源又得重新配置參數。

    第一個改進的版本是,加了Reduce過程,以期望通過設置Reduce數量來控制并發數。這樣雖然能精確地控制并發數,但是增加了shuffle 過程,實際運行中發現輸入數據有傾斜(而partition的key由于業務需要無法更改),導致部分機器網絡被打滿,從而影響到了集群中的其他應用。即 使通過 mapred.reduce.parallel.copies 參數來限制shuffle也是治標不治本。這個平白增加的shuffle過程實際上浪費了很多網絡帶寬和IO。

    最理想的情況當然是只有map階段,而且能夠準確的控制并發數了。

    于是,第二個優化版本誕生了。這個job只有map階段,采用CombineFileInputFormat, 它可以將多個小文件打包成一個InputSplit提供給一個Map處理,避免因為大量小文件問題,啟動大量map。通過 mapred.max.split.size 參數可以大概地控制并發數。本以為這樣就能解決問題了,結果又發現了數據傾斜的問題。這種粗略地分splits的方式,導致有的map處理的數據少,有的 map處理的數據多,并不均勻。幾個拖后退的map就導致job的實際運行時間長了一倍多。

    看來只有讓每個map處理的數據量一樣多,才能完美的解決這個問題了。

    第三個版本也誕生了,這次是重寫了CombineFileInputFormat,自己實現getSplits方法。由于輸入數據為SequenceFile格式,因此需要一個SequenceFileRecordReaderWrapper類。

    實現代碼如下:
    CustomCombineSequenceFileInputFormat.java

    import java.io.IOException;
     
    import org.apache.hadoop.classification.InterfaceAudience;
    import org.apache.hadoop.classification.InterfaceStability;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReaderWrapper;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
    import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
     
    /**
     * Input format that is a <code>CombineFileInputFormat</code>-equivalent for
     * <code>SequenceFileInputFormat</code>.
     *
     * @see CombineFileInputFormat
     */
    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public class CustomCombineSequenceFileInputFormat<K, V> extends MultiFileInputFormat<K, V> {
        @SuppressWarnings({"rawtypes", "unchecked"})
        public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context)
                throws IOException {
            return new CombineFileRecordReader((CombineFileSplit) split, context,
                    SequenceFileRecordReaderWrapper.class);
        }
     
        /**
         * A record reader that may be passed to <code>CombineFileRecordReader</code> so that it can be
         * used in a <code>CombineFileInputFormat</code>-equivalent for
         * <code>SequenceFileInputFormat</code>.
         *
         * @see CombineFileRecordReader
         * @see CombineFileInputFormat
         * @see SequenceFileInputFormat
         */
        private static class SequenceFileRecordReaderWrapper<K, V>
                extends CombineFileRecordReaderWrapper<K, V> {
            // this constructor signature is required by CombineFileRecordReader
            public SequenceFileRecordReaderWrapper(CombineFileSplit split, TaskAttemptContext context,
                    Integer idx) throws IOException, InterruptedException {
                super(new SequenceFileInputFormat<K, V>(), split, context, idx);
            }
        }
    }

    MultiFileInputFormat.java

    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
     
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
     
    /**
     * multiple files can be combined in one InputSplit so that InputSplit number can be limited!
     */
    public abstract class MultiFileInputFormat<K, V> extends CombineFileInputFormat<K, V> {
     
        private static final Log LOG = LogFactory.getLog(MultiFileInputFormat.class);
        public static final String CONFNAME_INPUT_SPLIT_MAX_NUM = "multifileinputformat.max_split_num";
        public static final Integer DEFAULT_MAX_SPLIT_NUM = 50;
     
        public static void setMaxInputSplitNum(Job job, Integer maxSplitNum) {
            job.getConfiguration().setInt(CONFNAME_INPUT_SPLIT_MAX_NUM, maxSplitNum);
        }
     
        @Override
        public List<InputSplit> getSplits(JobContext job) throws IOException {
            // get all the files in input path
            List<FileStatus> stats = listStatus(job);
            List<InputSplit> splits = new ArrayList<InputSplit>();
            if (stats.size() == 0) {
                return splits;
            }
            // 計算split的平均長度
            long totalLen = 0;
            for (FileStatus stat : stats) {
                totalLen += stat.getLen();
            }
            int maxSplitNum = job.getConfiguration().getInt(CONFNAME_INPUT_SPLIT_MAX_NUM, DEFAULT_MAX_SPLIT_NUM);
            int expectSplitNum = maxSplitNum < stats.size() ? maxSplitNum : stats.size();
            long averageLen = totalLen / expectSplitNum;
            LOG.info("Prepare InputSplit : averageLen(" + averageLen + ") totalLen(" + totalLen
                    + ") expectSplitNum(" + expectSplitNum + ") ");
            // 設置inputSplit
            List<Path> pathLst = new ArrayList<Path>();
            List<Long> offsetLst = new ArrayList<Long>();
            List<Long> lengthLst = new ArrayList<Long>();
            long currentLen = 0;
            for (int i = 0; i < stats.size(); i++) {
                FileStatus stat = stats.get(i);
                pathLst.add(stat.getPath());
                offsetLst.add(0L);
                lengthLst.add(stat.getLen());
                currentLen += stat.getLen();
                if (splits.size() < expectSplitNum - 1   && currentLen > averageLen) {
                    Path[] pathArray = new Path[pathLst.size()];
                    CombineFileSplit thissplit = new CombineFileSplit(pathLst.toArray(pathArray),
                        getLongArray(offsetLst), getLongArray(lengthLst), new String[0]);
                    LOG.info("combineFileSplit(" + splits.size() + ") fileNum(" + pathLst.size()
                            + ") length(" + currentLen + ")");
                    splits.add(thissplit);
                    //
                    pathLst.clear();
                    offsetLst.clear();
                    lengthLst.clear();
                    currentLen = 0;
                }
            }
            if (pathLst.size() > 0) {
                Path[] pathArray = new Path[pathLst.size()];
                CombineFileSplit thissplit =
                        new CombineFileSplit(pathLst.toArray(pathArray), getLongArray(offsetLst),
                                getLongArray(lengthLst), new String[0]);
                LOG.info("combineFileSplit(" + splits.size() + ") fileNum(" + pathLst.size()
                        + ") length(" + currentLen + ")");
                splits.add(thissplit);
            }
            return splits;
        }
     
        private long[] getLongArray(List<Long> lst) {
            long[] rst = new long[lst.size()];
            for (int i = 0; i < lst.size(); i++) {
                rst[i] = lst.get(i);
            }
            return rst;
        }
    }

    通過 multifileinputformat.max_split_num 參數就可以較為準確的控制map數量,而且會發現每個map處理的數據量很均勻。至此,問題總算解決了。

    posted on 2014-09-16 09:25 SIMONE 閱讀(683) 評論(1)  編輯  收藏 所屬分類: hadoop

    FeedBack:
    # re: 一個Hadoop程序的優化過程 – 根據文件實際大小實現CombineFileInputFormat[未登錄]
    2014-12-20 11:34 | 哈哈
    看了樓主的代碼,但是這種做法以文件為單位,一個文件至多分給一個map處理。如果某個目錄下有許多小文件, 另外還有一個超大文件, 處理大文件的map會嚴重偏慢,這個該怎么辦呢?  回復  更多評論
      
    主站蜘蛛池模板: 免费看的一级毛片| 国产亚洲精品资源在线26u| 免费精品久久久久久中文字幕| 亚洲AV成人潮喷综合网| 黄页免费在线观看| 亚洲成A人片在线播放器| 免费欧洲美女牲交视频| 一级毛片免费毛片一级毛片免费| 伊人久久亚洲综合影院首页| 色噜噜AV亚洲色一区二区| 中国人xxxxx69免费视频| 免费看一级一级人妻片 | 国产精品免费网站| 男男gay做爽爽的视频免费| 亚洲欧洲成人精品香蕉网| 色窝窝免费一区二区三区| 精品无码国产污污污免费网站国产| 亚洲精品亚洲人成在线观看麻豆 | 亚洲精品成人久久| 亚洲av日韩av欧v在线天堂| 性色午夜视频免费男人的天堂| MM1313亚洲国产精品| 亚洲黄色三级视频| 精品国产亚洲男女在线线电影 | 亚洲国产成人在线视频| 亚洲色偷偷狠狠综合网| 成人性生交视频免费观看| a级毛片视频免费观看| 在线91精品亚洲网站精品成人| 亚洲美女人黄网成人女| 久久亚洲国产精品123区| 成人免费视频一区二区| 国产精品久免费的黄网站| 亚洲免费在线视频播放| 亚洲免费视频一区二区三区| 亚洲欧美国产国产一区二区三区 | 免费看无码自慰一区二区| 免费无码成人AV在线播放不卡| 一级毛片在线完整免费观看| 久久久久久亚洲精品无码| 亚洲一区精品视频在线|