日期:2014-05-16 浏览次数:20523 次
1、通过mapreduce的方式存入hbase,只有map,其实reduce阶段也是一样的
代码如下:
import java.io.IOException; 
import org.apache.commons.logging.Log; 
import org.apache.commons.logging.LogFactory; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.hbase.HBaseConfiguration; 
import org.apache.hadoop.hbase.client.HTable; 
import org.apache.hadoop.hbase.client.Put; 
import org.apache.hadoop.hbase.util.Bytes; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 
import org.apache.hadoop.util.GenericOptionsParser; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 
public class HBaseImport extends Configured implements Tool{ 
static final Log LOG = LogFactory.getLog(HBaseImport.class); 
public static final String JOBNAME = "MRImport "; 
public static class Map extends Mapper<LongWritable , Text, NullWritable, NullWritable>{ 
Configuration configuration = null; 
HTable xTable = null; 
private boolean wal = true; 
static long count = 0; 
@Override 
protected void cleanup(Context context) throws IOException, 
InterruptedException { 
// TODO Auto-generated method stub 
super.cleanup(context); 
xTable.flushCommits(); 
xTable.close(); 
} 
@Override 
protected void map(LongWritable key, Text value, Context context) 
throws IOException, InterruptedException { 
String all[] = value.toString().split("/t"); 
If(all.length==2){ 
put = new Put(Bytes.toBytes(all[0]))); put.add(Bytes.toBytes("xxx"),Bytes.toBytes("20110313"),Bytes.toBytes(all[1])); 
} 
if (!wal) { 
put.setWriteToWAL(false); 
} 
xTable.put(put); 
if ((++count % 100)==0) { 
context.setStatus(count +" DOCUMENTS done!"); 
context.progress(); 
System.out.println(count +" DOCUMENTS done!"); 
} 
} 
@Override 
protected void setup(Context context) throws IOException, 
InterruptedException { 
// TODO Auto-generated method stub 
super.setup(context); 
configuration = context.getConfiguration(); 
xTable = new HTable(configuration,"testKang"); 
xTable.setAutoFlush(false); 
xTable.setWriteBufferSize(12*1024*1024); 
wal = true; 
} 
} 
@Override 
public int run(String[] args) throws Exception { 
String input = args[0]; 
Configuration conf = HBaseConfiguration.create(getConf()); 
conf.set("hbase.master", "m0:60000"); 
Job job = new Job(conf,JOBNAME); 
job.setJarByClass(HBaseImport.class); 
job.setMapperClass(Map.class); 
job.setNumReduceTasks(0); 
job.setInputFormatClass(TextInputFormat.class); 
TextInputFormat.setInputPaths(job, input); 
job.setOutputFormatClass(NullOutputFormat.class); 
return job.waitForCompletion(true)?0:1; 
} 
public static void main(String[] args) throws IOException { 
Configuration conf = new Configuration(); 
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 
int res = 1; 
try { 
res = ToolRunner.run(conf, new HBaseImport (), otherArgs); 
} catch (Exception e) { 
e.printStackTrace(); 
} 
System.exit(res); 
} 
} 
import java.io.BufferedReader; 
import java.io.File; 
import java.io.FileReader; 
import java.io.IOException; 
import java.util.ArrayList; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.hbase.HBaseConfiguration; 
import org.apache.hadoop.hbase.client.HTable; 
import org.apache.hadoop.hbase.client.Put