某日,接手了同事寫的從Hadoop集群拷貝數據到另外一個集群的程序,該程序是運行在Hadoop集群上的job。這個job只有map階段,讀取hdfs目錄下數據的數據,然后寫入到另外一個集群。
顯然,這個程序沒有考慮大數據量的情況,如果輸入目錄下文件很多或數據量很大,就會導致map數很多。而實際上我們需要拷貝的一個數據源就有近 6T,job啟動起來有1w多個map,一下子整個queue的資源就占滿了。雖然通過調整一些參數可以控制map數(也就是并發數),但是無法準確的控 制map數,而且換個數據源又得重新配置參數。
第一個改進的版本是,加了Reduce過程,以期望通過設置Reduce數量來控制并發數。這樣雖然能精確地控制并發數,但是增加了shuffle 過程,實際運行中發現輸入數據有傾斜(而partition的key由于業務需要無法更改),導致部分機器網絡被打滿,從而影響到了集群中的其他應用。即 使通過 mapred.reduce.parallel.copies 參數來限制shuffle也是治標不治本。這個平白增加的shuffle過程實際上浪費了很多網絡帶寬和IO。
最理想的情況當然是只有map階段,而且能夠準確的控制并發數了。
于是,第二個優化版本誕生了。這個job只有map階段,采用CombineFileInputFormat, 它可以將多個小文件打包成一個InputSplit提供給一個Map處理,避免因為大量小文件問題,啟動大量map。通過 mapred.max.split.size 參數可以大概地控制并發數。本以為這樣就能解決問題了,結果又發現了數據傾斜的問題。這種粗略地分splits的方式,導致有的map處理的數據少,有的 map處理的數據多,并不均勻。幾個拖后退的map就導致job的實際運行時間長了一倍多。
看來只有讓每個map處理的數據量一樣多,才能完美的解決這個問題了。
第三個版本也誕生了,這次是重寫了CombineFileInputFormat,自己實現getSplits方法。由于輸入數據為SequenceFile格式,因此需要一個SequenceFileRecordReaderWrapper類。
實現代碼如下:
CustomCombineSequenceFileInputFormat.java
import java.io.IOException; |
import org.apache.hadoop.classification.InterfaceAudience; |
import org.apache.hadoop.classification.InterfaceStability; |
import org.apache.hadoop.mapreduce.InputSplit; |
import org.apache.hadoop.mapreduce.RecordReader; |
import org.apache.hadoop.mapreduce.TaskAttemptContext; |
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; |
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader; |
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReaderWrapper; |
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; |
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; |
* Input format that is a <code>CombineFileInputFormat</code>-equivalent for |
* <code>SequenceFileInputFormat</code>. |
* @see CombineFileInputFormat |
@InterfaceAudience .Public |
@InterfaceStability .Stable |
public class CustomCombineSequenceFileInputFormat<K, V> extends MultiFileInputFormat<K, V> { |
@SuppressWarnings ({ "rawtypes" , "unchecked" }) |
public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) |
return new CombineFileRecordReader((CombineFileSplit) split, context, |
SequenceFileRecordReaderWrapper. class ); |
* A record reader that may be passed to <code>CombineFileRecordReader</code> so that it can be |
* used in a <code>CombineFileInputFormat</code>-equivalent for |
* <code>SequenceFileInputFormat</code>. |
* @see CombineFileRecordReader |
* @see CombineFileInputFormat |
* @see SequenceFileInputFormat |
private static class SequenceFileRecordReaderWrapper<K, V> |
extends CombineFileRecordReaderWrapper<K, V> { |
// this constructor signature is required by CombineFileRecordReader |
public SequenceFileRecordReaderWrapper(CombineFileSplit split, TaskAttemptContext context, |
Integer idx) throws IOException, InterruptedException { |
super ( new SequenceFileInputFormat<K, V>(), split, context, idx); |
MultiFileInputFormat.java
import java.io.IOException; |
import java.util.ArrayList; |
import org.apache.commons.logging.Log; |
import org.apache.commons.logging.LogFactory; |
import org.apache.hadoop.fs.FileStatus; |
import org.apache.hadoop.fs.Path; |
import org.apache.hadoop.mapreduce.InputSplit; |
import org.apache.hadoop.mapreduce.Job; |
import org.apache.hadoop.mapreduce.JobContext; |
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; |
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; |
* multiple files can be combined in one InputSplit so that InputSplit number can be limited! |
public abstract class MultiFileInputFormat<K, V> extends CombineFileInputFormat<K, V> { |
private static final Log LOG = LogFactory.getLog(MultiFileInputFormat. class ); |
public static final String CONFNAME_INPUT_SPLIT_MAX_NUM = "multifileinputformat.max_split_num" ; |
public static final Integer DEFAULT_MAX_SPLIT_NUM = 50 ; |
public static void setMaxInputSplitNum(Job job, Integer maxSplitNum) { |
job.getConfiguration().setInt(CONFNAME_INPUT_SPLIT_MAX_NUM, maxSplitNum); |
public List<InputSplit> getSplits(JobContext job) throws IOException { |
// get all the files in input path |
List<FileStatus> stats = listStatus(job); |
List<InputSplit> splits = new ArrayList<InputSplit>(); |
for (FileStatus stat : stats) { |
totalLen += stat.getLen(); |
int maxSplitNum = job.getConfiguration().getInt(CONFNAME_INPUT_SPLIT_MAX_NUM, DEFAULT_MAX_SPLIT_NUM); |
int expectSplitNum = maxSplitNum < stats.size() ? maxSplitNum : stats.size(); |
long averageLen = totalLen / expectSplitNum; |
LOG.info( "Prepare InputSplit : averageLen(" + averageLen + ") totalLen(" + totalLen |
+ ") expectSplitNum(" + expectSplitNum + ") " ); |
List<Path> pathLst = new ArrayList<Path>(); |
List<Long> offsetLst = new ArrayList<Long>(); |
List<Long> lengthLst = new ArrayList<Long>(); |
for ( int i = 0 ; i < stats.size(); i++) { |
FileStatus stat = stats.get(i); |
pathLst.add(stat.getPath()); |
lengthLst.add(stat.getLen()); |
currentLen += stat.getLen(); |
if (splits.size() < expectSplitNum - 1 && currentLen > averageLen) { |
Path[] pathArray = new Path[pathLst.size()]; |
CombineFileSplit thissplit = new CombineFileSplit(pathLst.toArray(pathArray), |
getLongArray(offsetLst), getLongArray(lengthLst), new String[ 0 ]); |
LOG.info( "combineFileSplit(" + splits.size() + ") fileNum(" + pathLst.size() |
+ ") length(" + currentLen + ")" ); |
if (pathLst.size() > 0 ) { |
Path[] pathArray = new Path[pathLst.size()]; |
CombineFileSplit thissplit = |
new CombineFileSplit(pathLst.toArray(pathArray), getLongArray(offsetLst), |
getLongArray(lengthLst), new String[ 0 ]); |
LOG.info( "combineFileSplit(" + splits.size() + ") fileNum(" + pathLst.size() |
+ ") length(" + currentLen + ")" ); |
private long [] getLongArray(List<Long> lst) { |
long [] rst = new long [lst.size()]; |
for ( int i = 0 ; i < lst.size(); i++) { |
通過 multifileinputformat.max_split_num 參數就可以較為準確的控制map數量,而且會發現每個map處理的數據量很均勻。至此,問題總算解決了。