日期:2014-05-16 浏览次数:20917 次
Java NOI中selector可视为一个观察者,只要我们把要观察的SocketChannel告诉Selector(注册的方式),我们就可以做其余的事情,等到已告知Channel上有事情发生时,Selector会通知我们,传回一组SelectionKey,我们读取这些Key,就可以获得Channel上的数据了。
Client端的底层通信直接采用了阻塞式IO编程,Server是采用Java NIO机制进行RPC通信:
java NIO参考资料:
http://www.iteye.com/topic/834447
http://weixiaolu.iteye.com/blog/1479656
=========================================================================================================================
Server是一个abstract类,抽象之处在call方法中,RPC.Server是ipc.Server的实现类,RPC.Server的构造函数调用了ipc.Server类的构造函数的,Namenode在初始化时调用RPC.getServer方法初始化了RPC.Server:
public static Server getServer(final Object instance, final String bindAddress, final int port, final int numHandlers, final boolean verbose, Configuration conf, SecretManager<? extends TokenIdentifier> secretManager) throws IOException { return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager); }
private static class Call { private int id; // 请求id private Writable param; // 请求的参数 private Connection connection; // 和Client一样,表示一个C/S间的连接 private long timestamp; // 时间戳 private ByteBuffer response; // server对此次请求的响应结果 ... }
public int readAndProcess() throws IOException, InterruptedException { //先对connection进行版本校验,校验成功后读取Header头部信息(得到客户端所用的协议和客户端的标识user) //,接着读取数据(Call.id和参数params,其中params),然后建立一个Call while (true) { /* Read at most one RPC. If the header is not read completely yet * then iterate until we read first RPC or until there is no data left. */ int count = -1; if (dataLengthBuffer.remaining() > 0) { count = channelRead(channel, dataLengthBuffer); if (count < 0 || dataLengthBuffer.remaining() > 0) return count; } if (!versionRead) {//尚未版本验证 //Every connection is expected to send the header. ByteBuffer versionBuffer = ByteBuffer.allocate(1); count = channelRead(channel, versionBuffer); if (count <= 0) { return count; } int version = versionBuffer.get(0); //要读取BufferByte前要先flip下 dataLengthBuffer.flip();//.flip();一定得有,如果没有,就是从最后开始读取的,当然读出来的都是byte=0时候的字符。 //通过buffer.flip();这个语句,就能把buffer的当前位置更改为buffer缓冲区的第一个位置 if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) { //Warning is ok since this is not supposed to happen. LOG.warn("Incorrect header or version mismatch from " + hostAddress + ":" + remotePort + " got version " + version + " expected version " + CURRENT_VERSION); return -1; } dataLengthBuffer.clear();//清除内容 versionRead = true;//验证版本了 continue; } if (data == null) {//分配新的data dataLengthBuffer.flip(); dataLength = dataLengthBu