日期:2014-05-16  浏览次数:20335 次

MapReduce之Join操作(2)

??? 上一篇介绍了 Repartition Join 的基本思想,实践出真知,具体的实现中总是存在各种细节问题。下面我们通过具体的源码分析来加深理解。本文分析的是 Hadoop-0.20.2 版本的 datajoin 代码,其它版本也许会有变化,这里暂且不论。

参看源码目录下,共实现有 7 个类,分别是:

  • ArrayListBackIterator.java
  • DataJoinJob.java
  • DataJoinMapperBase.java
  • DataJoinReducerBase.java
  • JobBase.java
  • ResetableIterator.java
  • TaggedMapOutput.java

??????? 源码比较简单,代码量小,下面对一些关键的地方进行分析:前面我们提到了 map 阶段的输出的 key 值的设定;然而在实现中,其value值也是另外一个需要考虑的地方,在不同的 reduce 结点进行 join 操作时,需要知道参与 join 的元组所属的表;解决方法是在 map 输出的 value 值中加入一个标记 (tag) ,例如上一篇例子中两表的 tag 可以分别 customer order (注:实际上,在reduce阶段可以直接分析两元组的结构就可以确定数据来源)。这也是 TaggedMapOutput.java 的来历。作为 Hadoop 的中间数据,必须实现 Writable 的方法,如下所示:

public abstract class TaggedMapOutput implements Writable {
    protected Text tag;
    public TaggedMapOutput() {
        this.tag = new Text("");
    }
    public Text getTag() {
        return tag;
    }
    public void setTag(Text tag) {
        this.tag = tag;
    }
    public abstract Writable getData();  
    public TaggedMapOutput clone(JobConf job) {
        return (TaggedMapOutput) WritableUtils.clone(this, job);
    }
}?

接下来,我们看看 DataJoinMapperBase 中的相关方法

protected abstract TaggedMapOutput generateTaggedMapOutput(Object value);
protected abstract Text generateGroupKey(TaggedMapOutput aRecord);

以上两个方法需要由子类实现。上一篇文章提到,将两个表的连接键作为 map 输出的 key 值,其中第二个方法所做的就是这件事,生成一个类型为 Text key ,不过这里是将它称作是 GroupKey 而已。因此 map 方法也就比较简单易懂了

public void map(Object key, Object value, OutputCollector output, 
                         Reporter reporter) throws IOException {
    if (this.reporter == null) {
        this.reporter = reporter;
    }
    addLongValue("totalCount", 1);
    TaggedMapOutput aRecord = generateTaggedMapOutput(value);
    if (aRecord == null) {
        addLongValue("discardedCount", 1);
        return;
    }
    Text groupKey = generateGroupKey(aRecord);
    if (groupKey == null) {
        addLongValue("nullGroupKeyCount", 1);
        return;
    }
    output.collect(groupKey, aRecord);
    addLongValue("collectedCount", 1);
}

说完了 map 操作,接下来就是