日期:2014-05-20 浏览次数:21108 次
public class MyMQService{
// 信道选择器
private Selector selector = null;
// 与服务器通信的信道
private SocketChannel socketChannel = null;
// 要连接的服务器Ip地址
private String hostIp = "localhost";
// 要连接的远程服务器在监听的端口
private int hostListenningPort = 33445;
private byte[] buffer = new byte[256];
private static int count;
/**
* 构造函数
*/
public MyMQService() {
try {
initialize();
} catch (IOException e) {
System.out.println("初始化服务器连接异常" + e.getMessage());
e.printStackTrace();
}
}
/**
* 初始化函数
*
* @throws IOException 异常
*/
private void initialize() throws IOException {
// 打开监听信道并设置为非阻塞模式
socketChannel = SocketChannel.open(new InetSocketAddress(hostIp, hostListenningPort));
socketChannel.configureBlocking(false);
// 打开并注册选择器到信道
selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
// 启动读取线程
new TCPClientReadThread(selector);
}
public void put(Object obj) throws IOException {
synchronized (this) {
count++;
System.out.println("第@"+count);
Message meg = new Message(1, obj);
// 对象转数组
byte[] objarr = NioUtil.obj2Byte(meg);
// 将对象赋值给固定长度的数组
System.arraycopy(objarr, 0, buffer, 0, objarr.length);
// 发送本次对象
socketChannel.write(ByteBuffer.wrap(buffer));
}
}
public static void main(String[] args) throws IOException {
MyMQService client1 = new MyMQService();
for (int i = 0; i < 10000; i++) {
client1.put("dataxxxx");
}
}
}
public class TCPClientReadThread implements Runnable {
private Selector selector;
// 超时时间,单位毫秒
private static final int TimeOut = 3000;
public TCPClientReadThread(Selector selector) {
this.selector = selector;
new Thread(this).start();
}
public void run() {
// Object obj = null;
try {
while (true) {
// 等待某信道就绪(或超时)
if (selector.select(TimeOut) == 0) {
// System.out.println("客户端运行中……");
continue;
}
// 遍历每个有可用IO操作Channel对应的SelectionKey
for (SelectionKey key : selector.selectedKeys()) {
// 如果该SelectionKey对应的Channel中有可读的数据
if (key.isReadable()) {
ClientTCPProtocol ci = new ClientTCPProtocol(1024);
ci.handleRead(key);
// obj = ci.getMegObj();
// System.out.println("服务器返回消息:" + obj);
// 为下一次读取作准备
key.interestOps(SelectionKey.OP_READ);
}
// 删除正在处理的SelectionKey
selector.selectedKeys().remove(key);
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
public class MyMQServer {
// 超时时间,单位毫秒
private static final int TimeOut = 6000;
// 本地监听端口
private static final int ListenPort = 33445;
public static void main(String[] args) throws IOException {
// 创建选择器
Selector selector = Selector.open();
// 打开监听信道
ServerSocketChannel listenerChannel = ServerSocketChannel.open();
// 与本地端口绑定
listenerChannel.socket().bind(new InetSocketAddress(ListenPort));
// 设置为非阻塞模式
listenerChannel.configureBlocking(false);
// 将选择器绑定到监听信道,只有非阻塞信道才可以注册选择器.并在注册过程中指出该信道可以进行Accept操作
listenerChannel.register(selector, SelectionKey.OP_ACCEPT);
// 创建一个处理协议的实现类,由它来具体操作
TCPProtocol protocol = new ServerTCPProtocol3();
// 反复循环,等待IO
while (true) {
// 等待某信道就绪(或超时)
if (selector.select(TimeOut) == 0) {
System.out.println("服务器运行中……");
continue;
}
// 取得迭代器.selectedKeys()中包含了每个准备好某一I/O操作的信道的SelectionKey
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
try {
if (key.isAcceptable()) {
// 有客户端连接请求时
protocol.handleAccept(key);
}
if (key.isReadable()) {
// 从客户端读取数据
protocol.handleRead(key);
}
if (key.isValid() && key.isWritable()) {
// 客户端可写时
protocol.handleWrite(key);
}
} catch (IOException e) {
// 出现IO异常(如客户端断开连接)时移除处理过的键
e.printStackTrace();
key.channel().close();
continue;
}
}
}
}
}
public class ServerTCPProtocol3 implements TCPProtocol {
private static int icount = 0;
private byte[] allBuffer = new byte[256];
// 当存在客户端访问时,判断是读,还是写
public void handleAccept(SelectionKey key) throws IOException {
SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
clientChannel.configureBlocking(false);
clientChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(256));
}
public void handleRead(SelectionKey key) throws IOException {
readObjByChannel(key);
}
/**
* 从通道中读取对象放到服务器队列中
*
* @param clientChannel 客户端通道
*/
public void readObjByChannel(SelectionKey key) throws IOException {
icount++;
System.out.println("进入1");
SocketChannel channel = (SocketChannel) key.channel();
// 拿到256长度的缓冲区,一个key一个
ByteBuffer buffer = (ByteBuffer) key.attachment();
int count = channel.read(buffer);
if (count > 0) {
buffer.flip();
Message meg = (Message)NioUtil.byte2Obj(buffer.array());
// 客户端请求类型
int type = meg.getType();
if (type == 1) {
// 向服务器发数据
try {
System.out.println("进入2");
// 将对象消息放到队列中
System.out.println("接收到来自客户端:" + channel.socket().getRemoteSocketAddress() + "的消息," + meg.getMeg() + "@第" + icount + "次");
// 向客户端写成功消息
meg.setMeg("OK");
byte[] objarr = NioUtil.obj2Byte(meg);
System.arraycopy(objarr, 0, allBuffer, 0, objarr.length);
channel.write(ByteBuffer.wrap(allBuffer));
buffer.clear();
} catch (Exception e) {
e.printStackTrace();
}
}
} else if(count < 0){
System.out.println("错啦");
channel.close();
}
key.interestOps(SelectionKey.OP_READ);
System.out.println("结束4");
}
public void handleWrite(SelectionKey key) throws IOException {
}