日期:2014-05-16 浏览次数:20584 次
package com.qin.operadb; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.lib.db.DBWritable; /*** * 封装数据库实体信息 * 的记录 * * 搜索大数据技术交流群:376932160 * * **/ public class PersonRecoder implements Writable,DBWritable { public int id;//对应数据库中id字段 public String name;//对应数据库中的name字段 public int age;//对应数据库中的age字段 @Override public void readFields(ResultSet result) throws SQLException { this.id=result.getInt(1); this.name=result.getString(2); this.age=result.getInt(3); } @Override public void write(PreparedStatement stmt) throws SQLException { stmt.setInt(1, id); stmt.setString(2, name); stmt.setInt(3, age); } @Override public void readFields(DataInput arg0) throws IOException { // TODO Auto-generated method stub this.id=arg0.readInt(); this.name=Text.readString(arg0); this.age=arg0.readInt(); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeInt(id); Text.writeString(out, this.name); out.writeInt(this.age); } @Override public String toString() { // TODO Auto-generated method stub return "id: "+id+" 年龄: "+age+" 名字:"+name; } }
package com.qin.operadb; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class ReadMapDB { /** * Map作业读取数据记录数 * * **/ private static class DBMap extends Mapper<LongWritable, PersonRecoder , LongWritable, Text>{ @Override protected void map(LongWritable key, PersonRecoder value,Context context) throws IOException, InterruptedException { context.write(new LongWritable(value.id), new Text(value.toString())); } } public static void main(String[] args)throws Exception { JobConf conf=new JobConf(ReadMapDB.class); //Configuration conf=new Configuration(); // conf.set("mapred.job.tracker","192.168.75.130:9001"); //读取person中的数据字段 // conf.setJar("tt.jar"); //注意这行代码放在最前面,进行初始化,否则会报 DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.211.36:3306/test", "root", "qin"); /**要读取的字段信息**/ String fileds[]=new String[]{"id","name","age"}; /**Job任务**/ Job job=new Job(conf, "readDB"); System.out.println("模式: "