日期:2014-05-16  浏览次数:20904 次

Apache MINA 1.x 中的ByteBuffer对象使用的问题
最近在为公司做一个消息中心的项目,项目中使用了Apache的MINA 1.7的版本,为消息中心做通讯接口。

[size=13px; line-height: 20px; ][size=x-small;]  Apache MINA(Multipurpose Infrastructure for Network Applications) 是 Apache 组织一个较新的项目,它为开发高性能和高可用性的网络应用程序提供了非常便利的框架。当前发行的 MINA 版本支持基于 Java NIO 技术的 TCP/UDP 应用程序开发、串口通讯程序,MINA 所支持的功能也在进一步的扩展中。[/size][/size]

    使用MINA组件确实可以提高通讯的并发性能以及可靠性,在1.7版本中,MINA封装了JDK中的ByteBuffer,虽然调用上方便了不少,但是如果不正当的使用它,容易引起性能上的问题,甚至会出现Out of Memory的问题。这次我就遇到了Out of Memory的问题,后来经过我的仔细分析和调试,终于发现了这条臭虫。

    首先来看一下如何搭建MINA的TCP的服务端:

   
        // output log to console
	org.apache.log4j.BasicConfigurator.configure();
	acceptor = new SocketAcceptor();

        // Create a service configuration
        SocketAcceptorConfig cfg = new SocketAcceptorConfig();
        cfg.setReuseAddress(true);
        // add a protocol codec, the protocol is wrapper from 0x00 to 0xff, and between 0x00 and 0xff is a message.
        cfg.getFilterChain().addLast(
                "protocolFilter",
                new ProtocolCodecFilter(
                        new WrapperTagProtocolCodecFactory()));
        // add logger filter
        cfg.getFilterChain().addLast("logger", new LoggingFilter());
        
        acceptor
        .bind(new InetSocketAddress(PORT), new IoHandlerAdapter(){
			@Override
			public void exceptionCaught(IoSession session, Throwable cause)
					throws Exception {
				// do something...
			}

			@Override
			public void messageReceived(IoSession session, Object message)
					throws Exception {
				session.write("response something").join();
			}
        	
        }, cfg);
    


    这段代码只是作为测试用例而写的,从代码中我们可以看到,代码中创建了SocketAcceptor对象,并且使用了自定义的ProtocolCodecFilter,这个通讯协议比较简单,协议格式为"<0x00><Message><0xff>",传输的消息被0x00和0xff两个字节包裹着。那么读取这条时,当读到0x00时开始,读取到0xff结束。当未读取到0x00时而接收到了一些数据,这些读取到的数据将被忽略。WrapperTagProtocolCodecFactory是一个工厂类,他将负责创建解码对象和编码对象,解码对象负责解释读取中的协议数据,并将协议数据转换为消息。而编码对象是将消息转换为协议数据。
    Socket通讯是一个不可靠的消息传输通道,网络上传输的数据,并不是一次性就从客户端送到服务端,或者是服务端送到客户端,而是通过数据链路层,将数据送过来,这样会产生数据将有可能通过多次送到目的地。而MINA中的组件使用了ByteBuffer对象作为数据接收缓存,所以需要制定合理的通讯协议,来确保接收端收到的数据是完整的,可靠的。
    当然如上的协议并不能完全的避免数据传输所产生的问题,因为还有可能产生数据丢包等其他的问题,但这个协议适合数据流小的消息。
    当Socket通道的消息经过了协议的过滤器之后,还将经过LoggingFilter的过滤,这里实际上就是将收到的消息记录到Logger中。之后,将送给IoHandler处理,上面的代码通过匿名类继承了IoHandlerAdapter,为了方便测试,这里只重载了messageReceived方法。当MINA收到消息之后,立即发送回应的数据到客户端。
    接下来,我们看看WrapperTagProtocolCodecFactory创建的两个对象:
   
public class WrapperTagProtocolCodecFactory extends
		DemuxingProtocolCodecFactory {

	public WrapperTagProtocolCodecFactory(){
		super.register(WrapperTagDecoder.class);
		super.register(WrapperTagEncoder.class);
	}

	public WrapperTagProtocolCodecFactory(String charset){
		super.register(new WrapperTagDecoder(charset));
		super.register(new WrapperTagEncoder(charset));
	}
}
    

    此工厂创建了WrapperTagDecoder(解码类)和WrapperTagEncoder(编码类)两个对象。
    WrapperTagDecoder
   
public class WrapperTagDecoder extends MessageDecoderAdapter {

	private String charset = "gb2312";
	
	private Logger logger = Logger.getLogger(WrapperTagDecoder.class);
	
	public WrapperTagDecoder(String charset){
		this.charset = charset;
	}
	
	public WrapperTagDecoder(){
		
	}

	public MessageDecoderResult decodable(IoSession session, ByteBuffer in) {
		return MessageDecoderResult.OK;
	}

	public MessageDecoderResult decode(IoSession session, ByteBuffer in,
			ProtocolDecoderOutput out) throws Exception {
		InputStream stream = in.asInputStream();
		int head = stream.read();
		if(head == -1){
			throw new IOException("read -1 byte");
		}
		logger.info("Read head 0x00.");
		int b = -1;
		ByteArrayOutputStream os = new ByteAr