日期:2014-05-16  浏览次数:20444 次

存入hbase的方法

1、通过mapreduce的方式存入hbase,只有map,其实reduce阶段也是一样的

代码如下:

import java.io.IOException; 

import org.apache.commons.logging.Log; 

import org.apache.commons.logging.LogFactory; 

import org.apache.hadoop.conf.Configuration; 

import org.apache.hadoop.conf.Configured; 

import org.apache.hadoop.hbase.HBaseConfiguration; 

import org.apache.hadoop.hbase.client.HTable; 

import org.apache.hadoop.hbase.client.Put; 

import org.apache.hadoop.hbase.util.Bytes; 

import org.apache.hadoop.io.LongWritable; 

import org.apache.hadoop.io.NullWritable; 

import org.apache.hadoop.io.Text; 

import org.apache.hadoop.mapreduce.Job; 

import org.apache.hadoop.mapreduce.Mapper; 

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 

import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 

import org.apache.hadoop.util.GenericOptionsParser; 

import org.apache.hadoop.util.Tool; 

import org.apache.hadoop.util.ToolRunner; 

public class HBaseImport extends Configured implements Tool{ 

static final Log LOG = LogFactory.getLog(HBaseImport.class); 

public static final String JOBNAME = "MRImport "; 

public static class Map extends Mapper<LongWritable , Text, NullWritable, NullWritable>{ 

Configuration configuration = null; 

HTable xTable = null; 

private boolean wal = true; 

static long count = 0; 

@Override 

protected void cleanup(Context context) throws IOException, 

InterruptedException { 

// TODO Auto-generated method stub 

super.cleanup(context); 

xTable.flushCommits(); 

xTable.close(); 

} 

@Override 

protected void map(LongWritable key, Text value, Context context) 

throws IOException, InterruptedException { 

String all[] = value.toString().split("/t"); 

If(all.length==2){ 

put = new Put(Bytes.toBytes(all[0]))); put.add(Bytes.toBytes("xxx"),Bytes.toBytes("20110313"),Bytes.toBytes(all[1])); 

} 

if (!wal) { 

put.setWriteToWAL(false); 

} 

xTable.put(put); 

if ((++count % 100)==0) { 

context.setStatus(count +" DOCUMENTS done!"); 

context.progress(); 

System.out.println(count +" DOCUMENTS done!"); 

} 

} 

@Override 

protected void setup(Context context) throws IOException, 

InterruptedException { 

// TODO Auto-generated method stub 

super.setup(context); 

configuration = context.getConfiguration(); 

xTable = new HTable(configuration,"testKang"); 

xTable.setAutoFlush(false); 

xTable.setWriteBufferSize(12*1024*1024); 

wal = true; 

} 

} 

@Override 

public int run(String[] args) throws Exception { 

String input = args[0]; 

Configuration conf = HBaseConfiguration.create(getConf()); 

conf.set("hbase.master", "m0:60000"); 

Job job = new Job(conf,JOBNAME); 

job.setJarByClass(HBaseImport.class); 

job.setMapperClass(Map.class); 

job.setNumReduceTasks(0); 

job.setInputFormatClass(TextInputFormat.class); 

TextInputFormat.setInputPaths(job, input); 

job.setOutputFormatClass(NullOutputFormat.class); 

return job.waitForCompletion(true)?0:1; 

} 

public static void main(String[] args) throws IOException { 

Configuration conf = new Configuration(); 

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 

int res = 1; 

try { 

res = ToolRunner.run(conf, new HBaseImport (), otherArgs); 

} catch (Exception e) { 

e.printStackTrace(); 

} 

System.exit(res); 

} 

} 

2、通过Java程序入库 
Java多线程读取本地磁盘上的文件,以HTable.put(put)的方式完成数据写入

import java.io.BufferedReader; 

import java.io.File; 

import java.io.FileReader; 

import java.io.IOException; 

import java.util.ArrayList; 

import org.apache.hadoop.conf.Configuration; 

import org.apache.hadoop.hbase.HBaseConfiguration; 

import org.apache.hadoop.hbase.client.HTable; 

import org.apache.hadoop.hbase.client.Put