在MR的时候经常会遇到多数据源join的问题,如果简单的分析任务采用hive处理就好,如果复杂一点需要自己写MR。
?
多数据源采用MultipleInputs类的addInputPath方法添加。
?
Job类
public class EfcOrderProRangeOdJob extends Configured implements Tool {
//TODO 路径
private final static String INTPUT_A = "D:/order/order/";
private final static String INTPUT_B = "D:/order/address/";
private final static String OUTPUT = "D:/testAAAAA/";
// private final static String OUTPUT = "/warehouse/tmp/pt_eft_order_pro_range/";
private final static String OUTPUT_TABLE = "fct_pt_icr_trade_day";
public static void main(String[] args) {
try {
int res = ToolRunner.run(new Configuration(), new EfcOrderProRangeOdJob(), args);
System.exit(res);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public int run(String[] args) throws Exception {
try {
String start = "20130217"; //TODO
Configuration conf = ConfUtil.getConf(getConf());
conf.set("start", start);
Job job1 = Job.getInstance(conf, "pt_eft_order_pro_range_first");
Path pathOrder = new Path(INTPUT_A);
Path pathAddress = new Path(INTPUT_B);
Path output = new Path(OUTPUT + start + "/");
FileSystem fs = FileSystem.get(conf);
if(fs.exists(output)){
fs.delete(output,true);
}
job1.setMapOutputKeyClass(TextPair.class);
job1.setMapOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job1, output);
MultipleInputs.addInputPath(job1, pathOrder, TextInputFormat.class, EfcOrderProRangeOrderMapper.class);
MultipleInputs.addInputPath(job1, pathAddress, TextInputFormat.class, EfcOrderProRangeAddressMapper.class);
job1.setReducerClass(EfcOrderProRangeReducer.class);
job1.setJarByClass(EfcOrderProRangeOdJob.class);
Job job2 = Job.getInstance(conf,"pt_eft_order_pro_range_second");
FileInputFormat.setInputPaths(job2, output);
job2.setMapperClass(EfcOrderProRangeSecondMapper.class);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(IntWritable.class);
TableMapReduceUtil.initTableReducerJob(OUTPUT_TABLE, EfcOrderProRangeSecondReducer.class, job2);
return JobChainHandler.handleJobChain(job1, job2, "pt_eft_order_pro_range");
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public static class TextPair implements WritableComparable<TextPair> {
private Text first;
private Text second;
public TextPair() {
set(new Text(), new Text());
}
public TextPair(String first, String second) {
set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second) {
set(first, second);
}
public void set(Text first, Text second) {
this.first = first;
this.second = second;
}
public Text getFirst() {
return first;
}
public Text getSecond() {
return second;
}
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
public int compareTo(TextPair tp) {
return first.compareTo(tp.first);
}
}
}
?
mapper1类
public class EfcOrderProRangeOrderMapper extends Mapper<LongWritable, Text, TextPair, Text>{
private static final int ORDER_ID_INDEX = 2;
private static final int ORDER_STATUS_INDEX = 5;
private static final String EFFECTIVE_STATUS = "3";
private static final String COL_SPLITER = "\001";
@Override
public void map(LongWritable key, Text value, Context context) {
try {
String [] order = value.toString().split(COL_SPLITER);
String orderId = order[ORDER_ID_INDEX];
String status = order[ORDER_STATUS_INDEX