日期:2014-05-16  浏览次数:20561 次

MapReduce实现reduce端join,多数据源

在MR的时候经常会遇到多数据源join的问题,如果简单的分析任务采用hive处理就好,如果复杂一点需要自己写MR。

?

多数据源采用MultipleInputs类的addInputPath方法添加。

?

Job类

public class EfcOrderProRangeOdJob extends Configured implements Tool {
	
	//TODO 路径
	private final static String INTPUT_A = "D:/order/order/";
	private final static String INTPUT_B = "D:/order/address/";
	private final static String OUTPUT = "D:/testAAAAA/";
//	private final static String OUTPUT = "/warehouse/tmp/pt_eft_order_pro_range/";
	private final static String OUTPUT_TABLE = "fct_pt_icr_trade_day";

	public static void main(String[] args) {
		try {
			int res = ToolRunner.run(new Configuration(), new EfcOrderProRangeOdJob(), args);
			System.exit(res);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Override
	public int run(String[] args) throws Exception {
		try {
			String start = "20130217"; //TODO
			Configuration conf = ConfUtil.getConf(getConf());
			conf.set("start", start);
			Job job1 = Job.getInstance(conf, "pt_eft_order_pro_range_first");
			
			Path pathOrder = new Path(INTPUT_A);
			Path pathAddress = new Path(INTPUT_B);
			Path output = new Path(OUTPUT + start + "/");
			
			FileSystem fs = FileSystem.get(conf);
			if(fs.exists(output)){
				fs.delete(output,true);
			}
			
			job1.setMapOutputKeyClass(TextPair.class);
			job1.setMapOutputValueClass(Text.class);
			
			FileOutputFormat.setOutputPath(job1, output);
			MultipleInputs.addInputPath(job1, pathOrder, TextInputFormat.class, EfcOrderProRangeOrderMapper.class);
			MultipleInputs.addInputPath(job1, pathAddress, TextInputFormat.class, EfcOrderProRangeAddressMapper.class);
			
			job1.setReducerClass(EfcOrderProRangeReducer.class);
			job1.setJarByClass(EfcOrderProRangeOdJob.class);
			
			Job job2 = Job.getInstance(conf,"pt_eft_order_pro_range_second");
			FileInputFormat.setInputPaths(job2, output);
			job2.setMapperClass(EfcOrderProRangeSecondMapper.class);
			job2.setMapOutputKeyClass(Text.class);
			job2.setMapOutputValueClass(IntWritable.class);
			TableMapReduceUtil.initTableReducerJob(OUTPUT_TABLE, EfcOrderProRangeSecondReducer.class, job2);
			
			return JobChainHandler.handleJobChain(job1, job2, "pt_eft_order_pro_range");
		} catch (Exception e) {
			e.printStackTrace();
			return 0;
		}
	}
	
	public static class TextPair implements WritableComparable<TextPair> {
		private Text first;
		private Text second;

		public TextPair() {
			set(new Text(), new Text());
		}

		public TextPair(String first, String second) {
			set(new Text(first), new Text(second));
		}

		public TextPair(Text first, Text second) {
			set(first, second);
		}

		public void set(Text first, Text second) {
			this.first = first;
			this.second = second;
		}

		public Text getFirst() {
			return first;
		}

		public Text getSecond() {
			return second;
		}

		public void write(DataOutput out) throws IOException {
			first.write(out);
			second.write(out);
		}

		public void readFields(DataInput in) throws IOException {
			first.readFields(in);
			second.readFields(in);
		}

		public int compareTo(TextPair tp) {
			return first.compareTo(tp.first);
		}
	}
}

?

mapper1类

public class EfcOrderProRangeOrderMapper extends Mapper<LongWritable, Text, TextPair, Text>{
	private static final int ORDER_ID_INDEX = 2;
	private static final int ORDER_STATUS_INDEX = 5;
	private static final String EFFECTIVE_STATUS = "3";
	private static final String COL_SPLITER = "\001";
	
	@Override
	public void map(LongWritable key, Text value, Context context) {
		try {
			String [] order = value.toString().split(COL_SPLITER);
			String orderId = order[ORDER_ID_INDEX];
			String status = order[ORDER_STATUS_INDEX