日期:2014-05-16  浏览次数:20486 次

hadoop 二次排序 插入数据库

???? 二次排序:根据自定义对象的compareTo 方法排序

??? 由下面的代码实现可以看出 二次排序的实质是 先根据第一个字段排完序后再排第二个字段

若还有第三个字段参与进来是否可以叫作三次排序呢? ?(?_ ?)

?

???? 另:根据程序断点初步判断?

设置job的sort?? 会在mapper 至combiner阶段执行

设置job的group会在combiner至reduce 阶段执行

不过在从combiner到reduce的时候若传递的key为自定义的对象即使重写了hashcode 和equals 方法也不会当成相同的key来处理 不得已在本程序中传输key为一个空Text()

?? 不知是否有别的方法可以实现? ?

?

插入数据库的操作在 附件中有详细的实现.

?

package hdfs.demo2.final_indb;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Demo2_3Mapp { 
 
	/**
	 * 用户自定义对象 保存
	 * @author Administrator
	 *
	 */
	public static class TopTenPair implements WritableComparable<TopTenPair>, DBWritable, Writable  {
		int prodid; //商品编码
		int price;  //商品价格
		int count;  //商品销售数量
		@Override
		public void write(PreparedStatement statement) throws SQLException {
			statement.setInt(1, prodid);
			statement.setInt(2, price);
			statement.setInt(3, count);
		}

		@Override
		public void readFields(ResultSet resultSet) throws SQLException {
			this.prodid = resultSet.getInt(1);
			this.price = resultSet.getInt(2);
			this.count = resultSet.getInt(3);
		}
		
		/**
		 * Set the prodId and price and count  values.
		 */
		public void set(int prodid, int price, int count) {
			this.prodid = prodid;
			this.price = price;
			this.count = count;
		}

		public int getProdid() {
			return prodid;
		}

		public int getPrice() {
			return price;
		}	
		public int getCount() {
			return count;
		}

		@Override
		// 反序列化,从流中的二进制转换成IntPair
		public void readFields(DataInput in) throws IOException {
			prodid = in.readInt();
			price = in.readInt();
			count  = in.readInt();
		}

		@Override
		// 序列化,将IntPair转化成使用流传送的二进制
		public void write(DataOutput out) throws IOException {
			out.writeInt(prodid);
			out.writeInt(price);
			out.writeInt(count);
		}

		@Override
		// key的比较
		public int compareTo(TopTenPair o) {
			if ( o.count ==count) {
				if( o.count==0){
					return  o.prodid - prodid;
				}
				return  o.price-price;
			}
			return o.count-count;
		}

		// 新定义类应该重写的两个方法
		@Override
		public int hashCode() {
			return count+prodid*3 ;
		}

		@Override
		public boolean equals(Object right) {
			if (right == null)
				return false;
			if (this == right)
				return true;
			if (right instanceof TopTenPair) {
				TopTenPair r = (TopTenPair) right;
				return r.prodid == prodid && r.price == price&& r.count == count;
			} else {
				return false;
			}
		}
		@Override
		public String toString(){
			return getProdid()+"\t"+getPrice()+"\t"+getC