日期:2014-05-20 浏览次数:21043 次
package wadihu.crawl; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; import java.util.LinkedList; import java.util.Queue; /** 爬行类,专门负责网页的下载, 以非阻塞方式连接 */ public class CrawlOrder1 { private boolean shutdown = false; // 用于控制Connector线程 private Selector selector; // 注册选择器 private Queue<Target> targetLists = new LinkedList<Target>(); // 任务队列 private Queue<Target> taskLists = new LinkedList<Target>(); // 待抓取任务对列 public CrawlOrder1() throws IOException { selector = Selector.open(); // 打开选择器 RW rw = new RW(); rw.start(); System.out.println("读写线程已启动..."); receiveTarget(); // 用户提交URL任务输入 } /**用户输入URL请求 */ public void receiveTarget() throws IOException { BufferedReader buf = new BufferedReader(new InputStreamReader(System.in)); String msg = null; while((msg = buf.readLine()) != null) { if(!msg.equals("bye")) { Target target = new Target(msg); addTarget(target); } else { shutdown = true; selector.wakeup(); System.out.println("系统已经停止"); break; } } } /** 向任务队列添加任务 * @throws IOException */ public void addTarget(Target target) throws IOException { synchronized (targetLists) { targetLists.add(target); } selector.wakeup(); } /** 注册读写事件 */ public void registerRW() { synchronized(targetLists) { while(targetLists.size() > 0) { Target target = targetLists.poll(); try { target.socketChannel.register(selector, SelectionKey.OP_WRITE|SelectionKey.OP_READ, target); } catch (ClosedChannelException e) { e.printStackTrace(); } } } } /** 读写就绪事件发生,处理读写的事件 * @throws IOException */ public void processSelectdRWKeys() throws IOException { for (Iterator<?> it = selector.selectedKeys().iterator(); it.hasNext();) { SelectionKey selectionKey = (SelectionKey) it.next(); it.remove(); SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); if(selectionKey.isWritable()) { String head = "GET / HTTP/1.1\r\nHOST:" + socketChannel.socket().getInetAddress().getHostName() + "\r\n" + "Accept:*/*\r\n" + "User-Agent: Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1;)\r\n\r\n"; ByteBuffer buffer = ByteBuffer.wrap(head.getBytes()); socketChannel.write(buffer); socketChannel.register(selector, SelectionKey.OP_READ); } else if(selectionKey.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(1024); int ret = socketChannel.read(buffer); if (ret < 0) { socketChannel.close(); selectionKey.cancel(); } buffer.flip(); Charset ch = Charset.forName("gb2312"); System.out.println(ch.decode(buffer)); } } } /** 建立读写内部类 */ private class RW extends Thread { public void run() { while(!shutdown) { try { registerRW(); if(selector.select(500) > 0) { processSelectdRWKeys(); } } catch (ClosedChannelException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException { new CrawlOrder1(); } } /** 一项抓取任务,外部类 */ class Target { SocketAddress address; SocketChannel socketChannel; public Target(String host) throws IOException { address = new InetSocketAddress(InetAddress.getByName(host), 80); this.socketChannel = SocketChannel.open(address); this.socketChannel.configureBlocking(false); } }