日期:2014-05-16 浏览次数:20362 次
???? 本文讲述如何在map端完成join操作。之前我们提到了reduce-join,这种方法的灵活性不错,也是理所当然地能够想到的方法;但这种方法存在的一个最大的问题是性能。大量的中间数据需要从map节点通过网络发送到reduce节点,因而效率比较低。实际上,两表的join操作中很多都是无用的数据。现在考虑可能的一种场景,其中一个表非常小,以致于可以直接存放在内存中,那么我们可以利用Hadoop提供的DistributedCache机制,将较小的表加入到其中,在每个map节点都能够访问到该表,最终实现在map阶段完成join操作。这里提一下DistributedCache,可以直观上将它看作是一个全局的只读空间,存储一些需要共享的数据;具体可以参看Hadoop相关资料,这里不进行深入讨论。
???? 实现的源码如下,原理非常简单明了:?????
import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.Hashtable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.KeyValueTextInputFormat; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @SuppressWarnings("deprecation") public class DataJoinDC extends Configured implements Tool{ private final static String inputa = "hdfs://m100:9000/joinTest/Customers"; private final static String inputb = "hdfs://m100:9000/joinTest/Orders"; private final static String output = "hdfs://m100:9000/joinTest/output"; public static class MapClass extends MapReduceBase implements Mapper<Text, Text, Text, Text> { private Hashtable<String, String> joinData = new Hashtable<String, String>(); @Override public void configure(JobConf conf) { try { Path [] cacheFiles = DistributedCache.getLocalCacheFiles(conf); if (cacheFiles != null && cacheFiles.length > 0) { String line; String[] tokens; BufferedReader joinReader = new BufferedReader( new FileReader(cacheFiles[0].toString())); try { while ((line = joinReader.readLine()) != null) { tokens = line.split(",", 2); joinData.put(tokens[0], tokens[1]); } }finally { joinReader.close(); }}} catch (IOException e) { System.err.println("Exception reading DistributedCache: " + e); } } public void map(Text key, Text value,OutputCollector<Text, Text> output, Reporter reporter) throws IOException { // for(String t: joinData.keySet()){ // output.collect(new Text(t), new Text(joinData.get(t))); // } String joinValue = joinData.get(key.toString()); if (joinValue != null) { output.collect(key,new Text(value.toString() + "," + joinValue)); } } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); DistributedCache.addCacheFile(new Path(inputa).toUri(), conf); JobConf job = new JobConf(conf, DataJoinDC.class); Path in = new Path(inputb); Path out = new Path(output); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setJobName("DataJoin with DistributedCache"); job.setMapperClass(MapClass.class); job.setNumReduceTasks(0); job.setInputFormat(KeyValueTextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.set("key.value.separator.in.input.line", ","); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception{ int res = ToolRunner.run(new Configuration(), new DataJoinDC(), args); System.exit(res); } } ?
? 以上参照《Hadoop in Action》 所附代码,我这里是将Customers表作为较小的表,传入DistributedCache。
? 这里需要注意的地方
DistributedCache.addCacheFile(new Path(inputa).toUri(), conf);
? 这句一定要