日期:2014-05-16  浏览次数:20953 次

Apache MINA (4) 接收处理请求的过程

上一篇博客 Apache MINA (3) NioSocketAcceptor初始化 了解了NioSocketAcceptor的初始化过程,完成了初始化,Acceptor线程被阻塞,处于等待客户端请求到达的状态,通过进一步研究源代码了解Mina处理请求的过程。

当有客户端的请求到达时selector.select()被唤醒

?

if (selected > 0) {
                        // We have some connection request, let's process 
                        // them here. 
                        processHandles(selectedHandles());
                    }

?

?processHandles(selectedHandles())负责处理所有接收到的请求,首先调用了NioSocketAcceptor的accept方法,接受来自客户端的请求,并初始化一个NioSocketSession,并将SocketChannel绑定到NioSession中,指定了处理该NioSession的NioProcesser,IoServices,IoFilterChain等,最后执行

?

session.getProcessor().add(session);?

?

把session交给SimpleIoProcessorPool处理。

?

?

上一篇博客中介绍了SimpleIoProcessorPool的初始化,他初始化了一个无界线程池this.executor = Executors.newCachedThreadPool();并定义了一个cpu+1个NioProcessor绑定到这个executor中,每个NioProcessor中初始化了一个Selector,提供Nio相关的具体实现,如select(),read(),write()等方法

?

SimpleIoProcessorPool的add(S session)方法首先指定一个处理session的NioProcessor;其中getProcessor(session),该方法会先从session的attributeMap中找有没有指定好的Processor,如果没有,根据sessionId和cpu+1取模取到一个NioProcessor放到attributeMap中。

processor = pool[Math.abs((int) session.getId()) % pool.length];
session.setAttributeIfAbsent(PROCESSOR, processor);
拿到对应的NioProcessor之后再执行NioPorcessor的add(S sission)方法,实现类是在其父类AbstractPollingIoProcessor中,session会被放到一个线程安全的队列newSessions中
private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>();
?然后启动processor,执行startupProcessor();
该方法初始化一个AbstractPollingIoProcessor的内部线程类Processor并放到executor中执行?
executor.execute(new NamePreservingRunnable(processor, threadName));
?processor开始执行,调用NioProcessor的select()方法,然后执行handleNewSessions(),处理newSessions队列中的session,执行addNow()方法,拿到session对应的SelectableChannel,设置为非阻塞的通道,注册通道为OP_READ,准备读取数据,同时为session构建filterChain,然后通知listeners激活创建事件,激活过滤器当前session创建和打开事件。

然后执行process()方法开始读取数据,初始化IoBuffer
IoBuffer buf = IoBuffer.allocate(bufferSize);
?调用NioProcessor的read()方法从channel中读取数据到IoBuffer中,这里指定的默认读取方式是读到buffer满或者数据读完然后结束然后调用buf.flip();重置buffer的指针为初始化状态
然后调用责任链激活数据已经读取到
filterChain.fireMessageReceived(buf);
?如果数据已经读完,会将当前session放到flushingSessions队列中。
filterChain负责将从chain中读取的byte数据转换成业务需要的数据,如对象,字符串等,从之前Hellow World的例子中看到,为这个责任链添加了一个按行读取数据的filter
chain.addLast("myChain", new ProtocolCodecFilter(new TextLineCodecFactory())); 
?构造方法中指定了按行处理的数据的encoder和decoder,如果没有读到换行,则将临时数据存到TextLineDecoder的内部类Context中,直到读到换行,将整行放到AbstractProtocolDecoderOutput的消息队列中,如果消息队列不为空,才将转换好的数据传递给下一个filter
Queue<Object> messageQueue = getMessageQueue();
            
            while (!messageQueue.isEmpty()) {
                nextFilter.messageReceived(session, messageQueue.poll());
            }
?这个时候消息进入到对应业务处理的IoHandler的实现类中去完成业务的相关操作,在之前的例子中创建了一个对应的实现类SimpleMinaServerHandler来处理
@Override
	public void messageReceived(IoSession session, Object message)
			throws Exception {

		String str = (String)message;

        // 打印客户端
		System.out.println("receive client message : [ " + str +" ]");

		// 回写消息给客户端
		session.write(count.incrementAndGet());

	}
?然后再调用session的write方法将处理后的信息回写给客户端
初始化WriteRequest,交给filterChain,执行fireFilterWrite回写数据给客户端

小结
以上过程描述了一次简单的客户端请求处理的过程,其核心过程如下: