日期:2014-05-16 浏览次数:20512 次
一、背景
     为了方便MapReduce直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过
DBInputFormat类把数据库表数据读入到HDFS,根据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。
二、技术细节
1、DBInputFormat(Mysql为例),先创建表:
CREATE TABLE studentinfo (
  id INTEGER NOT NULL PRIMARY KEY,
  name VARCHAR(32) NOT NULL);
2、由于0.20版本对DBInputFormat和DBOutputFormat支持不是很好,该例用了0.19版本来说明这两个类的用法。
3、DBInputFormat用法如下:
public class DBInput {
   // DROP TABLE IF EXISTS `hadoop`.`studentinfo`;
   // CREATE TABLE studentinfo (
   // id INTEGER NOT NULL PRIMARY KEY,
   // name VARCHAR(32) NOT NULL);
   public static class StudentinfoRecord implements Writable, DBWritable {
     int id;
     String name;
     public StudentinfoRecord() {
     }
     public void readFields(DataInput in) throws IOException {
        this.id = in.readInt();
        this.name = Text.readString(in);
     }
     public void write(DataOutput out) throws IOException {
        out.writeInt(this.id);
        Text.writeString(out, this.name);
     }
     public void readFields(ResultSet result) throws SQLException {
        this.id = result.getInt(1);
        this.name = result.getString(2);
     }
     public void write(PreparedStatement stmt) throws SQLException {
        stmt.setInt(1, this.id);
        stmt.setString(2, this.name);
     }
     public String toString() {
        return new String(this.id + " " + this.name);
     }
   }
   public class DBInputMapper extends MapReduceBase implements
        Mapper<LongWritable, StudentinfoRecord, LongWritable, Text> {
     public void map(LongWritable key, StudentinfoRecord value,
          OutputCollector<LongWritable, Text> collector, Reporter reporter)
          throws IOException {
        collector.collect(new LongWritable(value.id), new Text(value
             .toString()));
     }
   }
   public static void main(String[] args) throws IOException {
     JobConf conf = new JobConf(DBInput.class);
     DistributedCache.addFileToClassPath(new Path(
          "/lib/mysql-connector-java-5.1.0-bin.jar"), conf);
     conf.setMapperClass(DBInputMapper.class);
     conf.setReducerClass(IdentityReducer.class);
     conf.setMapOutputKeyClass(LongWritable.class);
     conf.setMapOutputValueClass(Text.class);
     conf.setOutputKeyClass(LongWritable.class);
     conf.setOutputValueClass(Text.class);
     conf.setInputFormat(DBInputFormat.class);
     FileOutputFormat.setOutputPath(conf, new Path("/hua01"));
     DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
          "jdbc:mysql://192.168.3.244:3306/hadoop", "hua", "hadoop");
     String[] fields = { "id", "name" };
     DBInputFormat.setInput(conf, StudentinfoRecord.class, "studentinfo",
 null, "id", fields);
     JobClient.runJob(conf);
   }
}
a)StudnetinfoRecord类的变量为表字段,实现Writable和DBWritable两个接口。
实现Writable的方法:
     public void readFields(DataInput in) throws IOException {
        this.id = in.readInt();
        this.name = Text.readString(in);
     }
     public void write(DataOutput out) throws IOException {
        out.writeInt(this.id);
        Text.writeString(out, this.name);
     }
实现DBWritable的方法:
    public void readFields(ResultSet result) throws SQLException {
        this.id = result.getInt(1);
        this.name = result.getString(2);
     }
     public void write(PreparedStatement stmt) throws SQLException {
        stmt.setInt(1, this.id);
        stmt.setString(2, this.name);
     }
b)读入Mapper的value类型是StudnetinfoRecord。
c)配置如何连入数据库,读出表studentinfo数据。
     DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
          "jdbc:mysql://192.168.3.244:3306/hadoop", "hua", "hadoop");