日期:2014-05-17 浏览次数:20992 次
package org.apache.hadoop.io; import java.io.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ReflectionUtils; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; public final class WritableUtils { /** * 压缩数据流 -> 解压后bytes * 数据流向: DataInput -> buffer (byte[]) -> ByteArrayInputStream -> GZIPInputStream * -> outbuf (byte[], decompressed) -> ByteOutputStream (memory) * -> memoryToBytesArray * 因为解压后的bytes大小未知,因此利用了ByteOutputStream自带的缓冲区来保存解压后的bytes */ public static byte[] readCompressedByteArray(DataInput in) throws IOException { int length = in.readInt(); if (length == -1) return null; byte[] buffer = new byte[length]; in.readFully(buffer); // could/should use readFully(buffer,0,length)? GZIPInputStream gzi = new GZIPInputStream(new ByteArrayInputStream(buffer, 0, buffer.length)); byte[] outbuf = new byte[length]; ByteArrayOutputStream bos = new ByteArrayOutputStream(); int len; while((len=gzi.read(outbuf, 0, outbuf.length)) != -1){ bos.write(outbuf, 0, len); } byte[] decompressed = bos.toByteArray(); bos.close(); gzi.close(); return decompressed; } public static void skipCompressedByteArray(DataInput in) throws IOException { int length = in.readInt(); if (length != -1) { skipFully(in, length); } } /** * bytes -> 压缩后输出到DataOutput * 数据流向: bytes -> GZIPOutputStream -> ByteArrayOutputStream (memory) -> buffer (memoryToBytesArray) * -> DataOutput (先写入压缩字节数,再写入buffer) */ public static int writeCompressedByteArray(DataOutput out, byte[] bytes) throws IOException { if (bytes != null) { ByteArrayOutputStream bos = new ByteArrayOutputStream(); GZIPOutputStream gzout = new GZIPOutputStream(bos); gzout.write(bytes, 0, bytes.length); gzout.close(); byte[] buffer = bos.toByteArray(); int len = buffer.length; out.writeInt(len); out.write(buffer, 0, len); /* debug only! Once we have confidence, can lose this. */ return ((bytes.length != 0) ? (100*buffer.length)/bytes.length : 0); } else { out.writeInt(-1); return -1; } } /* * 直接将从DataInput in里的输入数据流解压缩后,以UTF-8形式解析到String中 */ /* Ugly utility, maybe someone else can do this better */ public static String readCompressedString(DataInput in) throws IOException { byte[] bytes = readCompressedByteArray(in); if (bytes == null) return null; return new String(bytes, "UTF-8"); } /* * 先将String s以UTF-8形式变成bytes,然后压缩写入DataOutput */ public static int writeCompressedString(DataOutput out, String s) throws IOException { return writeCompressedByteArray(out, (s != null) ? s.getBytes("UTF-8") : null); } /* * * Write a String as a Network Int n, followed by n Bytes * Alternative to 16 bit read/writeUTF. * Encoding standard is... ? * */ public static void writeString(DataOutput out, String s) throws IOException { if (s != null) { byte[] buffer = s.getBytes("UTF-8"); int len = buffer.length; out.writeInt(len); out.write(buffer, 0, len); } else { out.writeInt(-1); } } /* * Read a String as a Network Int n, followed by n Bytes * Alternative to 16 bit read/writeUTF. * Encoding standard is... ? * */ public static String readString(DataInput in) throws IOException{ int length = in.readInt(); if (length == -1) return null; byte[] buffer = new byte[length]; in.readFully(buffer); // could/should use readFully(buffer,0,length)? return new String(buffer,"UTF-8"); } /* * Write a String array as a Nework Int N, followed by Int N Byte Array Strings. * Could be generalised using introspection. * */ public static vo