Posted on 2008-08-23 11:08
dennis 閱讀(2658)
評論(0) 編輯 收藏 所屬分類:
java 、
Hadoop與分布式
前兩天在公司內(nèi)網(wǎng)上搭了個2個節(jié)點hadoop集群,暫時沒有多大實際意義,僅用作自己的測試。遇到的問題在阿里巴巴這位仁兄的《
Hadoop集群配置和使用技巧》都有提到的。也遇到了reduce任務(wù)卡住的問題,只需要在每個節(jié)點的/etc/hosts將集群中的機(jī)器都配置上即可解決。
今天將一個日志統(tǒng)計任務(wù)用Hadoop MapReduce框架重新實現(xiàn)了一次,數(shù)據(jù)量并不大,每天分析一個2G多的日志文件罷了。先前是用Ruby配合cat、grep命令搞定,運行一次在50多秒左右,如果純粹采用Ruby的話CPU占用率非常高而且慢的無法忍受,利用IO.popen調(diào)用linux的cat、grep命令先期處理就好多了。看看這個MapReduce任務(wù):
public class GameCount extends Configured implements
org.apache.hadoop.util.Tool {
public static class MapClass extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
private Pattern pattern;
public void configure(JobConf job) {
String gameName = job.get("mapred.mapper.game");
pattern = Pattern.compile("play\\sgame\\s" + gameName
+ ".*uid=(\\d+),score=(-?\\d+),money=(-?\\d+)");
}
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String text = value.toString();
Matcher matcher = pattern.matcher(text);
int total = 0; // 總次數(shù)
while (matcher.find()) {
int record = Integer.parseInt(matcher.group(2));
output.collect(new Text(matcher.group(1)), new IntWritable(
record));
total += 1;
}
output.collect(new Text("total"), new IntWritable(total));
}
}
public static class ReduceClass extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
static int printUsage() {
System.out
.println("gamecount [-m <maps>] [-r <reduces>] <input> <output> <gamename>");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), GameCount.class);
conf.setJobName("gamecount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(MapClass.class);
conf.setCombinerClass(ReduceClass.class);
conf.setReducerClass(ReduceClass.class);
List<String> other_args = new ArrayList<String>();
for (int i = 0; i < args.length; ++i) {
try {
if ("-m".equals(args[i])) {
conf.setNumMapTasks(Integer.parseInt(args[++i]));
} else if ("-r".equals(args[i])) {
conf.setNumReduceTasks(Integer.parseInt(args[++i]));
} else {
other_args.add(args[i]);
}
} catch (NumberFormatException except) {
System.out.println("ERROR: Integer expected instead of "
+ args[i]);
return printUsage();
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from "
+ args[i - 1]);
return printUsage();
}
}
// Make sure there are exactly 2 parameters left.
if (other_args.size() != 3) {
System.out.println("ERROR: Wrong number of parameters: "
+ other_args.size() + " instead of 2.");
return printUsage();
}
FileInputFormat.setInputPaths(conf, other_args.get(0));
FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
conf.set("mapred.mapper.game", args[2]);
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
long start = System.nanoTime();
int res = ToolRunner.run(new Configuration(), new GameCount(), args);
System.out.println("running time:" + (System.nanoTime() - start)
/ 1000000 + " ms");
System.exit(res);
}
}
代碼沒啥好解釋的,就是分析類似"play game DouDiZhu result:uid=1871653,score=-720,money=0"這樣的字符串,分析每天玩家玩游戲的次數(shù)、分?jǐn)?shù)等。打包成GameCount.jar,執(zhí)行:
hadoop jar GameCount.jar test.GameCount /usr/logs/test.log /usr/output GameName
統(tǒng)計的運行時間在100多秒,適當(dāng)增加map和reduce任務(wù)個數(shù)沒有多大改善,不過CPU占用率還是挺高的。