日期:2014-05-16  浏览次数:20779 次

Netty中使用Apache Common FileUpload
/**
 * 用Netty来实现上传
 */
public class NettyFileUpload extends FileUpload {

	private NettyRequestContext context;

	public static final boolean isMultipartContent(HttpRequest request) {
		if (HttpMethod.POST != request.getMethod()) {
			return false;
		}
		if (request.getHeaders("Content-Type") == null && request.getHeaders("Content-Type").size() == 0) {
			return false;
		}
		String contentType = request.getHeaders("Content-Type").get(0);
		if (contentType == null) {
			return false;
		}
		if (contentType.toLowerCase().startsWith("multipart/")) {
			return true;
		}
		return false;
	}

	public NettyFileUpload(NettyRequestContext context) {
		this.context = context;
	}

	public NettyFileUpload(FileItemFactory fileItemFactory) {
		super(fileItemFactory);
	}

	public FileItemIterator getItemIterator() throws FileUploadException, IOException {
		return super.getItemIterator(context);
	}
}


public class NettyRequestContext implements RequestContext {
	private String encoding;
	private String contentType;
	private int contentLength = -1;
	/**
	 * 上传的内容流
	 */
	private InputStream inputStream;
	public NettyRequestContext(String encoding, String contentType,
 int contentLength, InputStream inputStream) {
		this.encoding = encoding;
		this.contentType = contentType;
		this.contentLength = contentLength;
		this.inputStream = inputStream;
	}
	@Override
	public String getCharacterEncoding() {
		return encoding;
	}
	@Override
	public String getContentType() {
		return contentType;
	}
	@Override
	public int getContentLength() {
		return contentLength;
	}
	@Override
	public InputStream getInputStream() throws IOException {
		// 不能直接用request的流,因为有HttpChunk
		return inputStream;
	}
	@Override
	public String toString() {
		return "ContentLength=" + this.getContentLength() + ", ContentType="
				+ this.getContentType();
	}

	public void closeInputStream() throws IOException {
		getInputStream().close();
	}
}


public class NettyChunkInputStream extends InputStream {

	private BlockingQueue<HttpChunk> chunkQueue = new ArrayBlockingQueue<HttpChunk>(128);

	private HttpChunk currentChunk = null;

	private volatile boolean closed;

	public boolean putChunk(HttpChunk chunk) throws IOException {
		if (!closed) {
			try {
				chunkQueue.put(chunk);
			} catch (InterruptedException e) {
				throw new IOException(e);
			}
			return true;
		}
		throw new IOException(" this inputstream has been closed!");

	}

	@Override
	public int read() throws IOException {
		byte resultByte = -1;
		try {
			if (getChunk().getContent().readable()) {
				resultByte = getChunk().getContent().readByte();
			} else if (!getChunk().isLast()) {
				nextChunk();
				if (getChunk().getContent().readable()) {
					resultByte = getChunk().getContent().readByte();
				} else {
					return -1;
				}
			} else {
				return -1;
			}
		} catch (InterruptedException e) {
			throw new IOException(e);
		}
		// InputStream.read()返回0-255之间的int
		return resultByte >= 0 ? resultByte : 256 + resultByte;
	}

	private HttpChunk getChunk() throws InterruptedException {
		if (currentChunk == null) {
			currentChunk = chunkQueue.take();
		}

		return currentChunk;
	}

	private void nextChunk() throws InterruptedException {
		currentChunk = chunkQueue.take();
	}

	@Override
	public int available() throws IOException {
		throw new UnsupportedOperationException("unsupport available()");
	}

	@Override
	public void close() throws IOException {
		chunkQueue = null;
		closed = true;
	}

	public boolean isClosed() {
		return closed;
	}

}

应用:




public class NettyUploadHandler extends SimpleChannelUpstreamHandler {
	private static ExecutorService EXECUTOR = Executors.newFixedThreadPool(32);
	private boolean hasReadChunk;
	private NettyChunkInputStream chunkStream = new NettyChunkInputStream();
	private NettyRequestContext context;

	private volatile Map<St