日期:2014-05-16 浏览次数:20912 次
/** * 用Netty来实现上传 */ public class NettyFileUpload extends FileUpload { private NettyRequestContext context; public static final boolean isMultipartContent(HttpRequest request) { if (HttpMethod.POST != request.getMethod()) { return false; } if (request.getHeaders("Content-Type") == null && request.getHeaders("Content-Type").size() == 0) { return false; } String contentType = request.getHeaders("Content-Type").get(0); if (contentType == null) { return false; } if (contentType.toLowerCase().startsWith("multipart/")) { return true; } return false; } public NettyFileUpload(NettyRequestContext context) { this.context = context; } public NettyFileUpload(FileItemFactory fileItemFactory) { super(fileItemFactory); } public FileItemIterator getItemIterator() throws FileUploadException, IOException { return super.getItemIterator(context); } } public class NettyRequestContext implements RequestContext { private String encoding; private String contentType; private int contentLength = -1; /** * 上传的内容流 */ private InputStream inputStream; public NettyRequestContext(String encoding, String contentType, int contentLength, InputStream inputStream) { this.encoding = encoding; this.contentType = contentType; this.contentLength = contentLength; this.inputStream = inputStream; } @Override public String getCharacterEncoding() { return encoding; } @Override public String getContentType() { return contentType; } @Override public int getContentLength() { return contentLength; } @Override public InputStream getInputStream() throws IOException { // 不能直接用request的流,因为有HttpChunk return inputStream; } @Override public String toString() { return "ContentLength=" + this.getContentLength() + ", ContentType=" + this.getContentType(); } public void closeInputStream() throws IOException { getInputStream().close(); } } public class NettyChunkInputStream extends InputStream { private BlockingQueue<HttpChunk> chunkQueue = new ArrayBlockingQueue<HttpChunk>(128); private HttpChunk currentChunk = null; private volatile boolean closed; public boolean putChunk(HttpChunk chunk) throws IOException { if (!closed) { try { chunkQueue.put(chunk); } catch (InterruptedException e) { throw new IOException(e); } return true; } throw new IOException(" this inputstream has been closed!"); } @Override public int read() throws IOException { byte resultByte = -1; try { if (getChunk().getContent().readable()) { resultByte = getChunk().getContent().readByte(); } else if (!getChunk().isLast()) { nextChunk(); if (getChunk().getContent().readable()) { resultByte = getChunk().getContent().readByte(); } else { return -1; } } else { return -1; } } catch (InterruptedException e) { throw new IOException(e); } // InputStream.read()返回0-255之间的int return resultByte >= 0 ? resultByte : 256 + resultByte; } private HttpChunk getChunk() throws InterruptedException { if (currentChunk == null) { currentChunk = chunkQueue.take(); } return currentChunk; } private void nextChunk() throws InterruptedException { currentChunk = chunkQueue.take(); } @Override public int available() throws IOException { throw new UnsupportedOperationException("unsupport available()"); } @Override public void close() throws IOException { chunkQueue = null; closed = true; } public boolean isClosed() { return closed; } }
public class NettyUploadHandler extends SimpleChannelUpstreamHandler { private static ExecutorService EXECUTOR = Executors.newFixedThreadPool(32); private boolean hasReadChunk; private NettyChunkInputStream chunkStream = new NettyChunkInputStream(); private NettyRequestContext context; private volatile Map<St