日期:2014-05-16  浏览次数:20950 次

mapreduce读取mysql
package com.sun.mysql;
import java.io.DataInput;  
import java.io.DataOutput;  
import java.io.IOException;  
import java.sql.PreparedStatement;  
import java.sql.ResultSet;  
import java.sql.SQLException;  
import java.util.Iterator;  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.io.Writable;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;  
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;  
import org.apache.hadoop.mapreduce.lib.db.DBWritable;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
 /**
  * 从mysql中读数据(结果存放在HDFS中)然后经mapreduce处理
  * @author asheng
  */
public class ReadDataFromMysql {  
    /**
     * 重写DBWritable
     * @author asheng
     * TblsRecord需要从mysql读取数据
     */
    public static class TblsRecord implements Writable, DBWritable 
    {  
            String tbl_name;  
            String tbl_type;  
            public TblsRecord() 
            {  


            }  
            @Override  
            public void write(PreparedStatement statement) throws SQLException 
            {
                    statement.setString(1, this.tbl_name);  
                    statement.setString(2, this.tbl_type);  
            }  
            @Override  
            public void readFields(ResultSet resultSet) throws SQLException 
            {  
                    this.tbl_name = resultSet.getString(1);  
                    this.tbl_type = resultSet.getString(2);  
            }  
            @Override  
            public void write(DataOutput out) throws IOException 
            {  
                    Text.writeString(out, this.tbl_name);  
                    Text.writeString(out, this.tbl_type);  
            }  
            @Override  
            public void readFields(DataInput in) throws IOException 
            {  
                    this.tbl_name = Text.readString(in);  
                    this.tbl_type = Text.readString(in);  
            }  
            public String toString() 
            {  
                return new String(this.tbl_name + " " + this.tbl_type);  
            }  
    }  
    /**
     * Mapper
     * @author asheng
     * 下面的类中的Mapper一定是包org.apache.hadoop.mapreduce.Mapper;下的
     */
public static class ConnMysqlMapper extends Mapper<LongWritable,TblsRecord,Text,Text>
 //TblsRecord是自定义的类型,也就是上面重写的DBWritable类
{  
        public void map(LongWritable key,TblsRecord values,Context context)throws IOException,
                                                                                                                InterruptedException 
        {  
               //只是将从数据库读取进来数据转换成Text类型然后输出给reduce
                context.write(new Text(values.tbl_name), new Text(values.tbl_type));  
        }  
}  
/**
 * Reducer
 * @author asheng
 * 下面的类中的Reducer一定是包org.apache.hadoop.mapreduce.Reducer;下的
 */
public static class ConnMysqlReducer extends Reducer<Text,Text,Text,Text> {  
        public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,
                                                                                                                InterruptedException 
        {  
               //循环遍历并写入相应的指定文件中
                for(Iterator<Text> itr = values.iterator();itr.hasNext();) 
                {  
                        context.write(key, itr.next());  
                }  
        }  
}  
public static void main(String[] args) throws Exception 
{  
        Configuration conf = new Configuration(); 

        DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver","jdbc:mysql://127.0.0.1:3306/mapr