日期:2014-05-20  浏览次数:20907 次

java NIO读出数据比写入数据少
简单写了个C/S模式的NIO,例子:客户端向服务器写50000次数据,服务接受到数据后打印出来。
由于首次使用NIO,碰到个问题:通过计数显示,客户端写了50000次。
但是,服务器端执行读操作时,每次读到10000次左右时就不再读了,跳回了selector.select()监听。
按理说,数据没读完,我每次读取完也执行了key.interestOps(SelectionKey.OP_READ),应该是能直到数据读取完才对,才会回到事件监听。
可是,现在就是不对了,求教。是我理解有错,代码不会,还是什么原因?
客户端代码:
Java code

public class MyMQService{

    // 信道选择器
    private Selector selector = null;

    // 与服务器通信的信道
    private SocketChannel socketChannel = null;

    // 要连接的服务器Ip地址
    private String hostIp = "localhost";

    // 要连接的远程服务器在监听的端口
    private int hostListenningPort = 33445;

    private byte[] buffer = new byte[256];
    private static int count;

    /**
     * 构造函数
     */
    public MyMQService() {
        try {
            initialize();
        } catch (IOException e) {
            System.out.println("初始化服务器连接异常" + e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     * 初始化函数
     *
     * @throws IOException 异常
     */
    private void initialize() throws IOException {
        // 打开监听信道并设置为非阻塞模式
        socketChannel = SocketChannel.open(new InetSocketAddress(hostIp, hostListenningPort));
        socketChannel.configureBlocking(false);
        // 打开并注册选择器到信道
        selector = Selector.open();
        socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));

        // 启动读取线程
        new TCPClientReadThread(selector);
    }

   
    public void put(Object obj) throws IOException {
        synchronized (this) {
            count++;
            System.out.println("第@"+count);
            Message meg = new Message(1, obj);
            // 对象转数组
            byte[] objarr = NioUtil.obj2Byte(meg);
            // 将对象赋值给固定长度的数组
            System.arraycopy(objarr, 0, buffer, 0, objarr.length);
            // 发送本次对象
            socketChannel.write(ByteBuffer.wrap(buffer));
        }
    }
    
    public static void main(String[] args) throws IOException {
        MyMQService client1 = new MyMQService();
        for (int i = 0; i < 10000; i++) {
            client1.put("dataxxxx");
        }
    }
}

public class TCPClientReadThread implements Runnable {
    
    private Selector selector;
    
    // 超时时间,单位毫秒
    private static final int TimeOut = 3000;
    
    public TCPClientReadThread(Selector selector) {
        this.selector = selector;
        
        new Thread(this).start();
    }
    
    public void run() {
        // Object obj = null;
        try {
            while (true) {
                // 等待某信道就绪(或超时)
                if (selector.select(TimeOut) == 0) {
                    // System.out.println("客户端运行中……");
                    continue;
                }
                // 遍历每个有可用IO操作Channel对应的SelectionKey
                for (SelectionKey key : selector.selectedKeys()) {
                    // 如果该SelectionKey对应的Channel中有可读的数据
                    if (key.isReadable()) {
                        ClientTCPProtocol ci = new ClientTCPProtocol(1024);
                        ci.handleRead(key);
                        // obj = ci.getMegObj();
                        // System.out.println("服务器返回消息:" + obj);
                        // 为下一次读取作准备
                        key.interestOps(SelectionKey.OP_READ);
                    }
                    
                    // 删除正在处理的SelectionKey
                    selector.selectedKeys().remove(key);
                }
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}


服务器端代码:
Java code

public class MyMQServer {

    // 超时时间,单位毫秒
    private static final int TimeOut = 6000;

    // 本地监听端口
    private static final int ListenPort = 33445;

    public static void main(String[] args) throws IOException {
        // 创建选择器
        Selector selector = Selector.open();

        // 打开监听信道
        ServerSocketChannel listenerChannel = ServerSocketChannel.open();

        // 与本地端口绑定
        listenerChannel.socket().bind(new InetSocketAddress(ListenPort));

        // 设置为非阻塞模式
        listenerChannel.configureBlocking(false);

        // 将选择器绑定到监听信道,只有非阻塞信道才可以注册选择器.并在注册过程中指出该信道可以进行Accept操作
        listenerChannel.register(selector, SelectionKey.OP_ACCEPT);

        // 创建一个处理协议的实现类,由它来具体操作
        TCPProtocol protocol = new ServerTCPProtocol3();

        // 反复循环,等待IO
        while (true) {
            // 等待某信道就绪(或超时)
            if (selector.select(TimeOut) == 0) {
                System.out.println("服务器运行中……");
                continue;
            }
            // 取得迭代器.selectedKeys()中包含了每个准备好某一I/O操作的信道的SelectionKey
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();

                try {
                    if (key.isAcceptable()) {
                        // 有客户端连接请求时
                        protocol.handleAccept(key);
                    }

                    if (key.isReadable()) {
                        // 从客户端读取数据
                        protocol.handleRead(key);
                    }

                    if (key.isValid() && key.isWritable()) {
                        // 客户端可写时
                        protocol.handleWrite(key);
                    }
                } catch (IOException e) {
                    // 出现IO异常(如客户端断开连接)时移除处理过的键
                    e.printStackTrace();
                    key.channel().close();
                    continue;
                }
            }
        }
    }
}

public class ServerTCPProtocol3 implements TCPProtocol {

    private static int icount = 0;

    private byte[] allBuffer = new byte[256];

    // 当存在客户端访问时,判断是读,还是写
    public void handleAccept(SelectionKey key) throws IOException {
        SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
        clientChannel.configureBlocking(false);
        clientChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(256));
    }

    public void handleRead(SelectionKey key) throws IOException {
        readObjByChannel(key);
    }

    /**
     * 从通道中读取对象放到服务器队列中
     *
     * @param clientChannel 客户端通道
     */
    public void readObjByChannel(SelectionKey key) throws IOException {
        icount++;
        System.out.println("进入1");
        SocketChannel channel = (SocketChannel) key.channel();
        // 拿到256长度的缓冲区,一个key一个
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        int count = channel.read(buffer);
        if (count > 0) {
            buffer.flip();
            Message meg = (Message)NioUtil.byte2Obj(buffer.array());
            // 客户端请求类型
            int type = meg.getType();
            if (type == 1) {
                // 向服务器发数据
                try {
                    System.out.println("进入2");
                    // 将对象消息放到队列中
                    System.out.println("接收到来自客户端:" + channel.socket().getRemoteSocketAddress() + "的消息," + meg.getMeg() + "@第" + icount + "次");
                    // 向客户端写成功消息
                    meg.setMeg("OK");
                    byte[] objarr =  NioUtil.obj2Byte(meg);
                    System.arraycopy(objarr, 0, allBuffer, 0, objarr.length);
                    channel.write(ByteBuffer.wrap(allBuffer));
                    buffer.clear();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        } else if(count < 0){
            System.out.println("错啦");
            channel.close();
        }
        key.interestOps(SelectionKey.OP_READ);
        System.out.println("结束4");
    }

    public void handleWrite(SelectionKey key) throws IOException {
       
    }