<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    posts - 495,comments - 227,trackbacks - 0
    http://www.rigongyizu.com/use-multiinputformat-read-different-files-in-one-job/

    hadoop中提供了 MultiOutputFormat 能將結(jié)果數(shù)據(jù)輸出到不同的目錄,也提供了 FileInputFormat 來一次讀取多個目錄的數(shù)據(jù),但是默認(rèn)一個job只能使用 job.setInputFormatClass 設(shè)置使用一個inputfomat處理一種格式的數(shù)據(jù)。如果需要實現(xiàn) 在一個job中同時讀取來自不同目錄的不同格式文件 的功能,就需要自己實現(xiàn)一個 MultiInputFormat 來讀取不同格式的文件了(原來已經(jīng)提供了MultipleInputs)。

    例如:有一個mapreduce job需要同時讀取兩種格式的數(shù)據(jù),一種格式是普通的文本文件,用 LineRecordReader 一行一行讀取;另外一種文件是偽XML文件,用自定義的AJoinRecordReader讀取。

    自己實現(xiàn)了一個簡單的 MultiInputFormat 如下:

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     
    public class MultiInputFormat extends TextInputFormat {
     
        @Override
        public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
            RecordReader reader = null;
            try {
                String inputfile = ((FileSplit) split).getPath().toString();
                String xmlpath = context.getConfiguration().get("xml_prefix");
                String textpath = context.getConfiguration().get("text_prefix");
     
                if (-1 != inputfile.indexOf(xmlpath)) {
                    reader = new AJoinRecordReader();
                } else if (-1 != inputfile.indexOf(textpath)) {
                    reader = new LineRecordReader();
                } else {
                    reader = new LineRecordReader();
                }
            } catch (IOException e) {
                // do something ...
            }
     
            return reader;
        }
    }

    其實原理很簡單,就是在 createRecordReader 的時候,通過 ((FileSplit) split).getPath().toString() 獲取到當(dāng)前要處理的文件名,然后根據(jù)特征匹配,選取對應(yīng)的 RecordReader 即可。xml_prefix和text_prefix可以在程序啟動時通過 -D 傳給Configuration。

    比如某次執(zhí)行打印的值如下:

    inputfile=hdfs://test042092.sqa.cm4:9000/test/input_xml/common-part-00068
    xmlpath_prefix=hdfs://test042092.sqa.cm4:9000/test/input_xml
    textpath_prefix=hdfs://test042092.sqa.cm4:9000/test/input_txt

    這里只是通過簡單的文件路徑和標(biāo)示符匹配來做,也可以采用更復(fù)雜的方法,比如文件名、文件后綴等。

    接著在map類中,也同樣可以根據(jù)不同的文件名特征進(jìn)行不同的處理:

    @Override
    public void map(LongWritable offset, Text inValue, Context context)
            throws IOException {
     
        String inputfile = ((FileSplit) context.getInputSplit()).getPath()
                .toString();
     
        if (-1 != inputfile.indexOf(textpath)) {
            ......
        } else if (-1 != inputfile.indexOf(xmlpath)) {
            ......
        } else {
            ......
        }
    }

    這種方式太土了,原來hadoop里面已經(jīng)提供了 MultipleInputs 來實現(xiàn)對一個目錄指定一個inputformat和對應(yīng)的map處理類。

    MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
       MapClass.class);
    MultipleInputs.addInputPath(conf, new Path("/bar"),
       KeyValueTextInputFormat.class, MapClass2.class);
    posted on 2014-09-16 09:27 SIMONE 閱讀(2750) 評論(0)  編輯  收藏 所屬分類: hadoop
    主站蜘蛛池模板: 蜜芽亚洲av无码一区二区三区| 亚洲乱码无人区卡1卡2卡3| 亚洲精品黄色视频在线观看免费资源 | 亚洲免费视频在线观看| 亚洲av无码一区二区乱子伦as | 亚洲Av无码乱码在线znlu| 黄色免费网址大全| 2015日韩永久免费视频播放 | 亚洲人成网站999久久久综合| 午夜寂寞在线一级观看免费| 久久久久国产亚洲AV麻豆 | 免费无码H肉动漫在线观看麻豆| 亚洲av永久无码精品秋霞电影影院| 日本道免费精品一区二区| 亚洲日本一区二区| 成人福利在线观看免费视频| 在线观看人成网站深夜免费| 亚洲一区中文字幕久久| 五月婷婷综合免费| 亚洲网站在线观看| 一二三四免费观看在线电影| 亚洲成年人电影在线观看| 国外成人免费高清激情视频| 美国免费高清一级毛片| 亚洲精品美女久久久久99| 四虎成年永久免费网站| 久久久久久久久无码精品亚洲日韩| 成熟女人特级毛片www免费| 免费观看四虎精品成人| 日韩亚洲欧洲在线com91tv| 两个人的视频www免费| 亚洲裸男gv网站| 午夜不卡久久精品无码免费| 亚洲高清一区二区三区| 国产成人午夜精品免费视频| 立即播放免费毛片一级| 亚洲国产精品不卡在线电影| 色吊丝最新永久免费观看网站| 99精品免费视品| 亚洲卡一卡2卡三卡4卡无卡三| 性色av免费观看|