經過上一篇的分析,我們知道了Hadoop的作業提交目標是Cluster還是Local,與conf文件夾內的配置文件參數有著密切關系,不僅如此,其它的很多類都跟conf有關,所以提交作業時切記把conf放到你的classpath中。
因為Configuration是利用當前線程上下文的類加載器來加載資源和文件的,所以這里我們采用動態載入的方式,先添加好對應的依賴庫和資源,然后再構建一個URLClassLoader作為當前線程上下文的類加載器。
public static ClassLoader getClassLoader() {
ClassLoader parent = Thread.currentThread().getContextClassLoader();
if (parent == null) {
parent = EJob.class.getClassLoader();
}
if (parent == null) {
parent = ClassLoader.getSystemClassLoader();
}
return new URLClassLoader(classPath.toArray(new URL[0]), parent);
}
代碼很簡單,廢話就不多說了。調用例子如下:
EJob.addClasspath("/usr/lib/hadoop-0.20/conf");
ClassLoader classLoader = EJob.getClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
設置好了類加載器,下面還有一步就是要打包Jar文件,就是讓Project自打包自己的class為一個Jar包,我這里以標準Eclipse工程文件夾布局為例,打包的就是bin文件夾里的class。

public static File createTempJar(String root) throws IOException {
if (!new File(root).exists()) {
return null;
}
Manifest manifest = new Manifest();
manifest.getMainAttributes().putValue("Manifest-Version", "1.0");
final File jarFile = File.createTempFile("EJob-", ".jar", new File(System
.getProperty("java.io.tmpdir")));
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
jarFile.delete();
}
});
JarOutputStream out = new JarOutputStream(new FileOutputStream(jarFile),
manifest);
createTempJarInner(out, new File(root), "");
out.flush();
out.close();
return jarFile;
}
private static void createTempJarInner(JarOutputStream out, File f,
String base) throws IOException {
if (f.isDirectory()) {
File[] fl = f.listFiles();
if (base.length() > 0) {
base = base + "/";
}
for (int i = 0; i < fl.length; i++) {
createTempJarInner(out, fl[i], base + fl[i].getName());
}
} else {
out.putNextEntry(new JarEntry(base));
FileInputStream in = new FileInputStream(f);
byte[] buffer = new byte[1024];
int n = in.read(buffer);
while (n != -1) {
out.write(buffer, 0, n);
n = in.read(buffer);
}
in.close();
}
}

這里的對外接口是createTempJar,接收參數為需要打包的文件夾根路徑,支持子文件夾打包。使用遞歸處理法,依次把文件夾里的結構和 文件打包到Jar里。很簡單,就是基本的文件流操作,陌生一點的就是Manifest和JarOutputStream,查查API就明了。
好,萬事具備,只欠東風了,我們來實踐一下試試。還是拿WordCount來舉例:
// Add these statements. XXX
File jarFile = EJob.createTempJar("bin");
EJob.addClasspath("/usr/lib/hadoop-0.20/conf");
ClassLoader classLoader = EJob.getClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCountTest.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
Run as Java Application。。。!!!No job jar file set...異常,看來job.setJarByClass(WordCountTest.class)這個語句設置作業Jar包沒有成功。這是為什么呢?
因為這個方法使用了WordCount.class的類加載器來尋找包含該類的Jar包,然后設置該Jar包為作業所用的Jar包。但是我們的作業 Jar包是在程序運行時才打包的,而WordCount.class的類加載器是AppClassLoader,運行后我們無法改變它的搜索路徑,所以使 用setJarByClass是無法設置作業Jar包的。我們必須使用JobConf里的setJar來直接設置作業Jar包,像下面一樣:
((JobConf)job.getConfiguration()).setJar(jarFile);
好,我們對上面的例子再做下修改,加上上面這條語句。
Job job = new Job(conf, "word count");
// And add this statement. XXX
((JobConf) job.getConfiguration()).setJar(jarFile.toString());
再Run as Java Application,終于OK了~~
該種方法的Run on Hadoop使用簡單,兼容性好,推薦一試。:)
本例子由于時間關系,只在Ubuntu上做了偽分布式測試,但理論上是可以用到真實分布式上去的。
>>點我下載<<
The end.