日期:2014-05-16  浏览次数:20577 次

morphia的mapreduce示例

mongodb有一个类似于orm映射的框架:morphia。其性质就类似于hibernate一样。 但它对mapreduce方法的包装并不是很方便使用,下面示例如下:?

?

public List<WeixinBean> getNewMessage(long updateTime) {
		
		//按用户分组,得到最小发送时间和最大更新时间
		String map = "function(){emit( this.openID+'#'+this.officalID,{ postTime: this.postTime,updateTime: this.updateTime} ); }";
		
		String function = "function(v){"
				+ "if(res.postTime == 0){"
				+ "res.postTime = v.postTime;"
				+ "}else if(res.postTime > v.postTime){"
				+ "res.postTime = v.postTime;"
				+ "}"
				+ "if(res.updateTime < v.updateTime){"
				+ "res.updateTime = v.updateTime;"
				+ "}"
				+ "}";
		String reduce = "function(key,values){"
				+ "var res = { postTime: 0,updateTime: 0}; "
				+ "values.forEach( "
				+ function
				+ " );"
				+ "return res;"
				+ "}";
		
		String outputCollection = "test";
		
		//在执行mapreduce之前,先查询出指定时间范围的数据作为map的输入
		BasicDBObject queryForMap = new BasicDBObject();  
//		if(compareCondition.equals(">")){
			//此处特别注意:不要使用put方法,否则查询无效,比如:new BasicDBObject().put("$gt", updateTime)
			queryForMap.append("updateTime", new BasicDBObject().append("$gt", updateTime));
//		}
		
		DBCollection weixinCollection = ds.getCollection(WeixinBean.class);
		MapReduceCommand cmd = new MapReduceCommand(
				weixinCollection,
				map,
				reduce,
				null,
				OutputType.INLINE,
				queryForMap);
		
		
		MapReduceOutput out = weixinCollection.mapReduce(cmd);
		
		CommandResult cr = out.getCommandResult();
		BasicDBList list = (BasicDBList)cr.get("results");
		List<WeixinBean> weixinList = new ArrayList<WeixinBean>();
		for(int i = 0 ; i < list.size() ; i++){
			BasicDBObject weixinObject = (BasicDBObject)list.get(i);
			System.out.print(weixinObject.getString("_id"));
			BasicDBObject value = (BasicDBObject)weixinObject.get("value");
			System.out.println(",postTime="+value.getLong("postTime") + ",updateTime="+ value.getLong("updateTime"));
			
			WeixinBean wb = new WeixinBean();
			String[] userIdArray = weixinObject.getString("_id").split("#");
			wb.setId(weixinObject.getString("_id"));
//			wb.setopenID(userIdArray[0]);
//			wb.setofficalID(userIdArray[1]);
			wb.setPostTime(value.getLong("postTime"));
			wb.setUpdateTime(value.getLong("updateTime"));
			weixinList.add(wb);
			
		}
		
		return weixinList;
		
		
//		DBCollection resultColl = out.getOutputCollection();    
//        DBCursor cursor= resultColl.find();    
//        while (cursor.hasNext()) {    
//            System.out.println(cursor.next());    
//        } 
		
//		MapreduceResults<WeixinBean> res = this.ds.mapReduce(MapreduceType.REDUCE, query, WeixinBean.class, cmd);
		
		
//		MapreduceResults<WeixinBean> res = ds.mapReduce(MapreduceType.INLINE, query, map, reduce, null, null, WeixinBean.class);
//		
//		res.setInlineRequiredOptions(WeixinBean.class, ds.getMapper(), new DefaultEntityCache());
//		Iterator<WeixinBean> it = res.getInlineResults();
////		Iterator<WeixinBean> it = res.iterator();
//		while(it.hasNext()){
//			WeixinBean wb = it.next();
//			System.out.println(wb.getId()+"-->"+wb.getPostTime());
//		}
//		
//		System.out.println(res.getCounts());
		
		
		
	}

?