日期:2014-05-16 浏览次数:20424 次
1、通过mapreduce的方式存入hbase,只有map,其实reduce阶段也是一样的
代码如下:
import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class HBaseImport extends Configured implements Tool{ static final Log LOG = LogFactory.getLog(HBaseImport.class); public static final String JOBNAME = "MRImport "; public static class Map extends Mapper<LongWritable , Text, NullWritable, NullWritable>{ Configuration configuration = null; HTable xTable = null; private boolean wal = true; static long count = 0; @Override protected void cleanup(Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub super.cleanup(context); xTable.flushCommits(); xTable.close(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String all[] = value.toString().split("/t"); If(all.length==2){ put = new Put(Bytes.toBytes(all[0]))); put.add(Bytes.toBytes("xxx"),Bytes.toBytes("20110313"),Bytes.toBytes(all[1])); } if (!wal) { put.setWriteToWAL(false); } xTable.put(put); if ((++count % 100)==0) { context.setStatus(count +" DOCUMENTS done!"); context.progress(); System.out.println(count +" DOCUMENTS done!"); } } @Override protected void setup(Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub super.setup(context); configuration = context.getConfiguration(); xTable = new HTable(configuration,"testKang"); xTable.setAutoFlush(false); xTable.setWriteBufferSize(12*1024*1024); wal = true; } } @Override public int run(String[] args) throws Exception { String input = args[0]; Configuration conf = HBaseConfiguration.create(getConf()); conf.set("hbase.master", "m0:60000"); Job job = new Job(conf,JOBNAME); job.setJarByClass(HBaseImport.class); job.setMapperClass(Map.class); job.setNumReduceTasks(0); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.setInputPaths(job, input); job.setOutputFormatClass(NullOutputFormat.class); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); int res = 1; try { res = ToolRunner.run(conf, new HBaseImport (), otherArgs); } catch (Exception e) { e.printStackTrace(); } System.exit(res); } }
import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put