日期:2014-05-16 浏览次数:20422 次
??? 上一篇介绍了 Repartition Join 的基本思想,实践出真知,具体的实现中总是存在各种细节问题。下面我们通过具体的源码分析来加深理解。本文分析的是 Hadoop-0.20.2 版本的 datajoin 代码,其它版本也许会有变化,这里暂且不论。
参看源码目录下,共实现有 7 个类,分别是:
- ArrayListBackIterator.java
 - DataJoinJob.java
 - DataJoinMapperBase.java
 - DataJoinReducerBase.java
 - JobBase.java
 - ResetableIterator.java
 - TaggedMapOutput.java
 
??????? 源码比较简单,代码量小,下面对一些关键的地方进行分析:前面我们提到了 map 阶段的输出的 key 值的设定;然而在实现中,其value值也是另外一个需要考虑的地方,在不同的 reduce 结点进行 join 操作时,需要知道参与 join 的元组所属的表;解决方法是在 map 输出的 value 值中加入一个标记 (tag) ,例如上一篇例子中两表的 tag 可以分别 customer 和 order (注:实际上,在reduce阶段可以直接分析两元组的结构就可以确定数据来源)。这也是 TaggedMapOutput.java 的来历。作为 Hadoop 的中间数据,必须实现 Writable 的方法,如下所示:
public abstract class TaggedMapOutput implements Writable {
    protected Text tag;
    public TaggedMapOutput() {
        this.tag = new Text("");
    }
    public Text getTag() {
        return tag;
    }
    public void setTag(Text tag) {
        this.tag = tag;
    }
    public abstract Writable getData();  
    public TaggedMapOutput clone(JobConf job) {
        return (TaggedMapOutput) WritableUtils.clone(this, job);
    }
}?
接下来,我们看看 DataJoinMapperBase 中的相关方法
protected abstract TaggedMapOutput generateTaggedMapOutput(Object value); protected abstract Text generateGroupKey(TaggedMapOutput aRecord);
以上两个方法需要由子类实现。上一篇文章提到,将两个表的连接键作为 map 输出的 key 值,其中第二个方法所做的就是这件事,生成一个类型为 Text 的 key ,不过这里是将它称作是 GroupKey 而已。因此 map 方法也就比较简单易懂了
public void map(Object key, Object value, OutputCollector output, 
                         Reporter reporter) throws IOException {
    if (this.reporter == null) {
        this.reporter = reporter;
    }
    addLongValue("totalCount", 1);
    TaggedMapOutput aRecord = generateTaggedMapOutput(value);
    if (aRecord == null) {
        addLongValue("discardedCount", 1);
        return;
    }
    Text groupKey = generateGroupKey(aRecord);
    if (groupKey == null) {
        addLongValue("nullGroupKeyCount", 1);
        return;
    }
    output.collect(groupKey, aRecord);
    addLongValue("collectedCount", 1);
}
说完了
map
操作,接下来就是