? ? ? ? 最近学习Hadoop,遇到一个问题,在eclipse中写完MapReduce之后如何在Hadoop中运行呢,常见的做法是将程序打成一个Jar包,然后传到hadoop集群中,通过命令行$HADOOP_HOME/bin/hadoop jar命令来运行,但是每次都要这样,十分麻烦。
? ? ? ? 当然,也可以在eclipse中安装hadoop的插件,可以方便的run on hadoop来运行。但是我们可以看到,client端的代码常见形式是在main方法中创建Configuration对象和job对象,具体代码如下
Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if(otherArgs.length!=2){ System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf,"combiner_test_job"); job.setJarByClass(CombinerTest.class); job.setMapperClass(CombinerTestMapper.class); job.setReducerClass(CombinerTestReducer.class); job.setCombinerClass(CombinerTestReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1);
,既然有main方法,我们能不能作为普通的java application来运行呢?试过之后我们很有可能会得到一个IOException,?
- java.io.IOException:?Failed?to?set?permissions?of?path:?\XX..XX\.staging?to?0700??
这个问题在我的另一篇blog中有说明,想进一步了解的点这里。解决了这个问题之后,应该就可以运行啦,好像这样问题就解决啦。但是,如果深入分析为什么为得到这个错误,我们会发现之前成功的运行job其实是在本机中运行的,也就是hadoop的local模式,job并没有在集群中的hadoop上运行,充其量也只是使用了一下HDFS。因为我们没有设置mapred.job.tracker,默认就是local。待我们将该属性设置为集群对应的值后,再运行,我们会发现,还是有错误。这次应该会得到一个ClassNotFoundException,其实是Mapper或者Reducer的类找不到。
? ? ? ? 为什么会找不到类呢?带着这个疑问,在网上查找一番,终于有所收获,大家可以看一下这篇文章,挺不错的,按照作者的分析,问题出在setJarByClass这个方法里面,其实最终调用的是JobConf类中的setJarByClass方法,该方法代码如下
public void setJarByClass(Class cls) { String jar = findContainingJar(cls); if (jar != null) { setJar(jar); } }?
首先,该方法会通过参数中的class查找对应的Jar文件,在命令行运行的情况下,是可以找到的,因为我们是将程序打好jar包之后调用hadoop jar来执行的,但是默认情况下,该jar肯定是不存在的,这样,就不会调用下面的setJar方法,所以程序中使用到的class就找不到了。知道了原因,我们只要将包含Mapper和Reducer以及其他类的Jar包通过setJar方法设置一下,就可以啦。
? ? ? ? 但是默认情况下,jar包是不存在的,我们可以在程序中通过代码实现将class打jar包,然后再进行设置。这里参考网上的资料,写了一个JarUtil类
package com.shidan.hadoop.util; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.util.jar.JarEntry; import java.util.jar.JarOutputStream; public class JarUtil{ public static void jar(String inputFileName, String outputFileName){ JarOutputStream out = null; try{ out = new JarOutputStream(new FileOutputStream(outputFileName)); File f = new File(inputFileName); jar(out, f, ""); }catch (Exception e){ e.printStackTrace(); }finally{ try { out.close(); } catch (IOException e) { e.printStackTrace(); } } } private static void jar(JarOutputStream out, File f, String base) throws Exception { if (f.isDirectory()) { File[] fl = f.listFiles(); base = base.length() == 0 ? "" : base + "/"; // 注意,这里用左斜杠 for (int i = 0; i < fl.length; i++) { jar(out, fl[ i], base + fl[ i].getName()); } } else { out.putNextEntry(new JarEntry(base)); FileInputStream in = new FileInputStream(f); byte[] buffer = new byte[1024]; int n = in.read(buffer); while (n != -1) { out.write(buffer, 0, n); n = in.read(buffer); } in.close(); } } }