InputFormat,就是為了能夠從一個jobconf中得到一個split集合(InputSplit[]),然后再為這個split集合配上一個合適的RecordReader(getRecordReader)來讀取每個split中的數據。
InputSplit,繼承自Writable接口,因此一個InputSplit實則包含了四個接口函數,讀和寫(readFields和write),getLength能夠給出這個split中所記錄的數據大小,getLocations能夠得到這個split位于哪些主機之上(blkLocations[blkIndex].getHosts()),這里需要說明的是一個block要么對應一個split,要么對應多個split,因此每個split都可以從它所屬的block中獲取主機信息,而且我猜測block的大小應該是split的整數倍,否則有可能一個split跨越兩個block。
對于RecordReader,其實這個接口主要就是為了維護一組<K,V>鍵值對,任何一個實現了該接口的類的構造函數都需要是“(Configuration conf, Class< ? extends InputSplit> split)”的形式,因為一個RecordReader是有針對性的,就是針對某種split來進行的,因此必須得與某種split綁定起來。這個接口中最重要的方法就是next,在利用next進行讀取K和V時,需要先通過createKey和createValue來創建K和V的對象,然后再傳給next作為參數,使得next對形參中的數據成員進行修改。
一個file(FileStatus)分成多個block存儲(BlockLocation[]),每個block都有固定的大小(file.getBlockSize()),然后計算出每個split所需的大小(computeSplitSize(goalSize, minSize, blockSize)),然后將長度為length(file.getLen())的file分割為多個split,最后一個不足一個split大小的部分單獨為其分配一個split,最后返回這個file分割的最終結果(return splits.toArray(new FileSplit[splits.size()]))。
一個job,會得到輸入的文件路徑(conf.get("mapred.input.dir", "")),然后據此可以得到一個Path[],對于每個Path,都可以得到一個fs(FileSystem fs = p.getFileSystem(job)),然后再得到一個FileStatus[](FileStatus[] matches = fs.globStatus(p, inputFilter)),再把里面的每個FileStatus拿出來,判斷其是否為dir,如果是的話就FileStatus stat:fs.listStatus(globStat.getPath(), inputFilter),然后再將stat加入到最終的結果集中result;如果是文件的話,那就直接加入到結果集中。說得簡潔一些,就是一個job會得到input.dir中的所有文件,每個文件都用FileStatus來記錄。
MultiFileSplit的官方描述是“A sub-collection of input files. Unlike {@link FileSplit}, MultiFileSplit class does not represent a split of a file, but a split of input files into smaller sets. The atomic unit of split is a file.”,一個MultiFileSplit中含有多個小文件,每個文件應該只隸屬于一個block,然后getLocations就返回所有小文件對應的block的getHosts;getLength返回所有文件的大小總和。
對于MultiFileInputFormat,它的getSplits返回的是一個MultiFileSplit的集合,也就是一個個的小文件簇,舉個簡單的例子就會很清楚了:假定這個job中有5個小文件,大小分別為2,3,5,1,4;假定我們期望split的總數目為3的話,先算出個double avgLengthPerSplit = ((double)totLength) / numSplits,結果應該為5;然后再切分,因此得到的三個文件簇為:{文件1和2}、{文件3}、{文件4和5}。如果這五個文件的大小分別為2,5,3,1,4;那么應該得到四個文件簇為:{文件1}、{文件2}、{文件3和4}、{文件5}。此外,這個類的getRecordReader依然是個abstract的方法,因此其子類必須得實現這個函數。