在MR的时候经常会遇到多数据源join的问题,如果简单的分析任务采用hive处理就好,如果复杂一点需要自己写MR。
?
多数据源采用MultipleInputs类的addInputPath方法添加。
?
Job类
public class EfcOrderProRangeOdJob extends Configured implements Tool { //TODO 路径 private final static String INTPUT_A = "D:/order/order/"; private final static String INTPUT_B = "D:/order/address/"; private final static String OUTPUT = "D:/testAAAAA/"; // private final static String OUTPUT = "/warehouse/tmp/pt_eft_order_pro_range/"; private final static String OUTPUT_TABLE = "fct_pt_icr_trade_day"; public static void main(String[] args) { try { int res = ToolRunner.run(new Configuration(), new EfcOrderProRangeOdJob(), args); System.exit(res); } catch (Exception e) { e.printStackTrace(); } } @Override public int run(String[] args) throws Exception { try { String start = "20130217"; //TODO Configuration conf = ConfUtil.getConf(getConf()); conf.set("start", start); Job job1 = Job.getInstance(conf, "pt_eft_order_pro_range_first"); Path pathOrder = new Path(INTPUT_A); Path pathAddress = new Path(INTPUT_B); Path output = new Path(OUTPUT + start + "/"); FileSystem fs = FileSystem.get(conf); if(fs.exists(output)){ fs.delete(output,true); } job1.setMapOutputKeyClass(TextPair.class); job1.setMapOutputValueClass(Text.class); FileOutputFormat.setOutputPath(job1, output); MultipleInputs.addInputPath(job1, pathOrder, TextInputFormat.class, EfcOrderProRangeOrderMapper.class); MultipleInputs.addInputPath(job1, pathAddress, TextInputFormat.class, EfcOrderProRangeAddressMapper.class); job1.setReducerClass(EfcOrderProRangeReducer.class); job1.setJarByClass(EfcOrderProRangeOdJob.class); Job job2 = Job.getInstance(conf,"pt_eft_order_pro_range_second"); FileInputFormat.setInputPaths(job2, output); job2.setMapperClass(EfcOrderProRangeSecondMapper.class); job2.setMapOutputKeyClass(Text.class); job2.setMapOutputValueClass(IntWritable.class); TableMapReduceUtil.initTableReducerJob(OUTPUT_TABLE, EfcOrderProRangeSecondReducer.class, job2); return JobChainHandler.handleJobChain(job1, job2, "pt_eft_order_pro_range"); } catch (Exception e) { e.printStackTrace(); return 0; } } public static class TextPair implements WritableComparable<TextPair> { private Text first; private Text second; public TextPair() { set(new Text(), new Text()); } public TextPair(String first, String second) { set(new Text(first), new Text(second)); } public TextPair(Text first, Text second) { set(first, second); } public void set(Text first, Text second) { this.first = first; this.second = second; } public Text getFirst() { return first; } public Text getSecond() { return second; } public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } public int compareTo(TextPair tp) { return first.compareTo(tp.first); } } }
?
mapper1类
public class EfcOrderProRangeOrderMapper extends Mapper<LongWritable, Text, TextPair, Text>{ private static final int ORDER_ID_INDEX = 2; private static final int ORDER_STATUS_INDEX = 5; private static final String EFFECTIVE_STATUS = "3"; private static final String COL_SPLITER = "\001"; @Override public void map(LongWritable key, Text value, Context context) { try { String [] order = value.toString().split(COL_SPLITER); String orderId = order[ORDER_ID_INDEX]; String status = order[ORDER_STATUS_INDEX