http://www.rigongyizu.com/use-multiinputformat-read-different-files-in-one-job/
hadoop中提供了 MultiOutputFormat 能將結(jié)果數(shù)據(jù)輸出到不同的目錄,也提供了 FileInputFormat 來一次讀取多個目錄的數(shù)據(jù),但是默認(rèn)一個job只能使用 job.setInputFormatClass 設(shè)置使用一個inputfomat處理一種格式的數(shù)據(jù)。如果需要實現(xiàn) 在一個job中同時讀取來自不同目錄的不同格式文件 的功能,就需要自己實現(xiàn)一個 MultiInputFormat 來讀取不同格式的文件了(原來已經(jīng)提供了MultipleInputs)。
例如:有一個mapreduce job需要同時讀取兩種格式的數(shù)據(jù),一種格式是普通的文本文件,用 LineRecordReader 一行一行讀取;另外一種文件是偽XML文件,用自定義的AJoinRecordReader讀取。
自己實現(xiàn)了一個簡單的 MultiInputFormat 如下:
import org.apache.hadoop.io.LongWritable; |
import org.apache.hadoop.io.Text; |
import org.apache.hadoop.mapreduce.InputSplit; |
import org.apache.hadoop.mapreduce.RecordReader; |
import org.apache.hadoop.mapreduce.TaskAttemptContext; |
import org.apache.hadoop.mapreduce.lib.input.FileSplit; |
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; |
public class MultiInputFormat extends TextInputFormat { |
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { |
RecordReader reader = null ; |
String inputfile = ((FileSplit) split).getPath().toString(); |
String xmlpath = context.getConfiguration().get( "xml_prefix" ); |
String textpath = context.getConfiguration().get( "text_prefix" ); |
if (- 1 != inputfile.indexOf(xmlpath)) { |
reader = new AJoinRecordReader(); |
} else if (- 1 != inputfile.indexOf(textpath)) { |
reader = new LineRecordReader(); |
reader = new LineRecordReader(); |
} catch (IOException e) { |
其實原理很簡單,就是在 createRecordReader 的時候,通過 ((FileSplit) split).getPath().toString() 獲取到當(dāng)前要處理的文件名,然后根據(jù)特征匹配,選取對應(yīng)的 RecordReader 即可。xml_prefix和text_prefix可以在程序啟動時通過 -D 傳給Configuration。
比如某次執(zhí)行打印的值如下:
這里只是通過簡單的文件路徑和標(biāo)示符匹配來做,也可以采用更復(fù)雜的方法,比如文件名、文件后綴等。
接著在map類中,也同樣可以根據(jù)不同的文件名特征進(jìn)行不同的處理:
public void map(LongWritable offset, Text inValue, Context context) |
String inputfile = ((FileSplit) context.getInputSplit()).getPath() |
if (- 1 != inputfile.indexOf(textpath)) { |
} else if (- 1 != inputfile.indexOf(xmlpath)) { |
這種方式太土了,原來hadoop里面已經(jīng)提供了 MultipleInputs 來實現(xiàn)對一個目錄指定一個inputformat和對應(yīng)的map處理類。
MultipleInputs.addInputPath(conf, new Path( "/foo" ), TextInputFormat. class , |
MultipleInputs.addInputPath(conf, new Path( "/bar" ), |
KeyValueTextInputFormat. class , MapClass2. class ); |
posted on 2014-09-16 09:27
SIMONE 閱讀(2750)
評論(0) 編輯 收藏 所屬分類:
hadoop