日期:2014-05-16  浏览次数:20584 次

如何使用Hadoop读写数据库
在我们的一些应用程序中,常常避免不了要与数据库进行交互,而在我们的hadoop中,有时候也需要和数据库进行交互,比如说,数据分析的结果存入数据库,或者是,读取数据库的信息写入HDFS上,不过直接使用MapReduce操作数据库,这种情况在现实开发还是比较少,一般我们会采用Sqoop来进行数据的迁入,迁出,使用Hive分析数据集,大多数情况下,直接使用Hadoop访问关系型数据库,可能产生比较大的数据访问压力,尤其是在数据库还是单机的情况下,情况可能更加糟糕,在集群的模式下压力会相对少一些。

那么,今天散仙就来看下,如何直接使用Hadoop1.2.0的MR来读写操作数据库,hadoop的API提供了DBOutputFormat和DBInputFormat这两个类,来进行与数据库交互,除此之外,我们还需要定义一个类似JAVA Bean的实体类,来与数据库的每行记录进行对应,通常这个类要实现Writable和DBWritable接口,来重写里面的4个方法以对应获取每行记录里面的各个字段信息。

下面,我们先来看下如何使用MR来读取数据库的数据,并写入HDFS上,
数据表的截图如下所示,



实体类定义代码:
package com.qin.operadb;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 
/***
 * 封装数据库实体信息
 * 的记录
 * 
 * 搜索大数据技术交流群:376932160
 * 
 * **/
public class PersonRecoder implements Writable,DBWritable {

	public int id;//对应数据库中id字段
	public String name;//对应数据库中的name字段
	public int age;//对应数据库中的age字段
	
	
	
	
	@Override
	public void readFields(ResultSet result) throws SQLException {
	
		
		this.id=result.getInt(1);
		this.name=result.getString(2);
		this.age=result.getInt(3);
		
	}

	@Override
	public void write(PreparedStatement stmt) throws SQLException {
		
		stmt.setInt(1, id);
		stmt.setString(2, name);
		stmt.setInt(3, age);
		
	}

	@Override
	public void readFields(DataInput arg0) throws IOException {
		// TODO Auto-generated method stub
		this.id=arg0.readInt();
		this.name=Text.readString(arg0);
		this.age=arg0.readInt();
		
	}

	@Override
	public void write(DataOutput out) throws IOException {
		// TODO Auto-generated method stub
		out.writeInt(id);
		Text.writeString(out, this.name);
		out.writeInt(this.age);
	}
	
	
	@Override
	public String toString() {
		// TODO Auto-generated method stub
		return "id: "+id+"  年龄: "+age+"   名字:"+name;
	}
	

}

MR类的定义代码,注意是一个Map Only作业:
package com.qin.operadb;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ReadMapDB {
	
	
	
	/**
	 * Map作业读取数据记录数
	 * 
	 * **/
	private static class DBMap extends Mapper<LongWritable, PersonRecoder	, LongWritable, Text>{
		@Override
		protected void map(LongWritable key, PersonRecoder value,Context context)
				throws IOException, InterruptedException {
		 
			context.write(new LongWritable(value.id), new Text(value.toString()));
		 
		}
	}
	
	public static void main(String[] args)throws Exception {
		
		
		 JobConf conf=new JobConf(ReadMapDB.class);
		//Configuration conf=new Configuration();
	   //	conf.set("mapred.job.tracker","192.168.75.130:9001");
		//读取person中的数据字段
	 //	conf.setJar("tt.jar");
		 
		//注意这行代码放在最前面,进行初始化,否则会报
		DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.211.36:3306/test", "root", "qin");
		
		/**要读取的字段信息**/
		String fileds[]=new String[]{"id","name","age"};
		/**Job任务**/
		Job job=new Job(conf, "readDB");
		System.out.println("模式:  "