日期:2014-05-16  浏览次数:20408 次

Hadoop中DBInputFormat和DBOutputFormat使用

一、背景

为了方便MapReduce直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过

DBInputFormat类把数据库表数据读入到HDFS,根据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。

二、技术细节

1、DBInputFormat(Mysql为例),先创建表:
CREATE TABLE studentinfo (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(32) NOT NULL);
2、由于0.20版本对DBInputFormat和DBOutputFormat支持不是很好,该例用了0.19版本来说明这两个类的用法。
3、DBInputFormat用法如下:
public class DBInput {
// DROP TABLE IF EXISTS `hadoop`.`studentinfo`;
// CREATE TABLE studentinfo (
// id INTEGER NOT NULL PRIMARY KEY,
// name VARCHAR(32) NOT NULL);

public static class StudentinfoRecord implements Writable, DBWritable {
int id;
String name;
public StudentinfoRecord() {

}
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.name = Text.readString(in);
}
public void write(DataOutput out) throws IOException {
out.writeInt(this.id);
Text.writeString(out, this.name);
}
public void readFields(ResultSet result) throws SQLException {
this.id = result.getInt(1);
this.name = result.getString(2);
}
public void write(PreparedStatement stmt) throws SQLException {
stmt.setInt(1, this.id);
stmt.setString(2, this.name);
}
public String toString() {
return new String(this.id + " " + this.name);
}
}
public class DBInputMapper extends MapReduceBase implements
Mapper<LongWritable, StudentinfoRecord, LongWritable, Text> {
public void map(LongWritable key, StudentinfoRecord value,
OutputCollector<LongWritable, Text> collector, Reporter reporter)
throws IOException {
collector.collect(new LongWritable(value.id), new Text(value
.toString()));
}
}
public static void main(String[] args) throws IOException {
JobConf conf = new JobConf(DBInput.class);
DistributedCache.addFileToClassPath(new Path(
"/lib/mysql-connector-java-5.1.0-bin.jar"), conf);

conf.setMapperClass(DBInputMapper.class);
conf.setReducerClass(IdentityReducer.class);

conf.setMapOutputKeyClass(LongWritable.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);

conf.setInputFormat(DBInputFormat.class);
FileOutputFormat.setOutputPath(conf, new Path("/hua01"));
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
"jdbc:mysql://192.168.3.244:3306/hadoop", "hua", "hadoop");
String[] fields = { "id", "name" };
DBInputFormat.setInput(conf, StudentinfoRecord.class, "studentinfo",
null, "id", fields);

JobClient.runJob(conf);
}
}

a)StudnetinfoRecord类的变量为表字段,实现Writable和DBWritable两个接口。

实现Writable的方法:
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.name = Text.readString(in);
}
public void write(DataOutput out) throws IOException {
out.writeInt(this.id);
Text.writeString(out, this.name);
}

实现DBWritable的方法:
public void readFields(ResultSet result) throws SQLException {
this.id = result.getInt(1);
this.name = result.getString(2);
}
public void write(PreparedStatement stmt) throws SQLException {
stmt.setInt(1, this.id);
stmt.setString(2, this.name);
}

b)读入Mapper的value类型是StudnetinfoRecord。

c)配置如何连入数据库,读出表studentinfo数据。
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
"jdbc:mysql://192.168.3.244:3306/hadoop", "hua", "hadoop");