日期:2014-05-16 浏览次数:20335 次
??? 上一篇介绍了 Repartition Join 的基本思想,实践出真知,具体的实现中总是存在各种细节问题。下面我们通过具体的源码分析来加深理解。本文分析的是 Hadoop-0.20.2 版本的 datajoin 代码,其它版本也许会有变化,这里暂且不论。
参看源码目录下,共实现有 7 个类,分别是:
- ArrayListBackIterator.java
- DataJoinJob.java
- DataJoinMapperBase.java
- DataJoinReducerBase.java
- JobBase.java
- ResetableIterator.java
- TaggedMapOutput.java
??????? 源码比较简单,代码量小,下面对一些关键的地方进行分析:前面我们提到了 map 阶段的输出的 key 值的设定;然而在实现中,其value值也是另外一个需要考虑的地方,在不同的 reduce 结点进行 join 操作时,需要知道参与 join 的元组所属的表;解决方法是在 map 输出的 value 值中加入一个标记 (tag) ,例如上一篇例子中两表的 tag 可以分别 customer 和 order (注:实际上,在reduce阶段可以直接分析两元组的结构就可以确定数据来源)。这也是 TaggedMapOutput.java 的来历。作为 Hadoop 的中间数据,必须实现 Writable 的方法,如下所示:
public abstract class TaggedMapOutput implements Writable { protected Text tag; public TaggedMapOutput() { this.tag = new Text(""); } public Text getTag() { return tag; } public void setTag(Text tag) { this.tag = tag; } public abstract Writable getData(); public TaggedMapOutput clone(JobConf job) { return (TaggedMapOutput) WritableUtils.clone(this, job); } }?
接下来,我们看看 DataJoinMapperBase 中的相关方法
protected abstract TaggedMapOutput generateTaggedMapOutput(Object value); protected abstract Text generateGroupKey(TaggedMapOutput aRecord);
以上两个方法需要由子类实现。上一篇文章提到,将两个表的连接键作为 map 输出的 key 值,其中第二个方法所做的就是这件事,生成一个类型为 Text 的 key ,不过这里是将它称作是 GroupKey 而已。因此 map 方法也就比较简单易懂了
public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { if (this.reporter == null) { this.reporter = reporter; } addLongValue("totalCount", 1); TaggedMapOutput aRecord = generateTaggedMapOutput(value); if (aRecord == null) { addLongValue("discardedCount", 1); return; } Text groupKey = generateGroupKey(aRecord); if (groupKey == null) { addLongValue("nullGroupKeyCount", 1); return; } output.collect(groupKey, aRecord); addLongValue("collectedCount", 1); }
说完了
map
操作,接下来就是