日期:2014-05-17  浏览次数:20975 次

org.apache.hadoop.io.WritableUtils简单分析


package org.apache.hadoop.io;

import java.io.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;

import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

public final class WritableUtils  {

	
	
/**
 * 压缩数据流 -> 解压后bytes
 * 数据流向: DataInput -> buffer (byte[]) -> ByteArrayInputStream -> GZIPInputStream
 *                    -> outbuf (byte[], decompressed) -> ByteOutputStream (memory)
 *                    -> memoryToBytesArray 
 * 因为解压后的bytes大小未知,因此利用了ByteOutputStream自带的缓冲区来保存解压后的bytes
 */
public static byte[] readCompressedByteArray(DataInput in) throws IOException {
    int length = in.readInt();
    if (length == -1) return null;
    byte[] buffer = new byte[length];
    in.readFully(buffer);      // could/should use readFully(buffer,0,length)?
    GZIPInputStream gzi = new GZIPInputStream(new ByteArrayInputStream(buffer, 0, buffer.length));
    byte[] outbuf = new byte[length];
    ByteArrayOutputStream bos =  new ByteArrayOutputStream();
    int len;
    while((len=gzi.read(outbuf, 0, outbuf.length)) != -1){
      bos.write(outbuf, 0, len);
    }
    byte[] decompressed =  bos.toByteArray();
    bos.close();
    gzi.close();
    return decompressed;
  }

  public static void skipCompressedByteArray(DataInput in) throws IOException {
    int length = in.readInt();
    if (length != -1) {
      skipFully(in, length);
    }
  }
  /**
   * bytes -> 压缩后输出到DataOutput
   * 数据流向: bytes -> GZIPOutputStream -> ByteArrayOutputStream (memory) -> buffer (memoryToBytesArray)
   *                -> DataOutput (先写入压缩字节数,再写入buffer)
   */
  public static int  writeCompressedByteArray(DataOutput out, byte[] bytes) throws IOException {
    if (bytes != null) {
      ByteArrayOutputStream bos =  new ByteArrayOutputStream();
      GZIPOutputStream gzout = new GZIPOutputStream(bos);
      gzout.write(bytes, 0, bytes.length);
      gzout.close();
      byte[] buffer = bos.toByteArray();
      int len = buffer.length;
      out.writeInt(len);
      out.write(buffer, 0, len);
      /* debug only! Once we have confidence, can lose this. */
      return ((bytes.length != 0) ? (100*buffer.length)/bytes.length : 0);
    } else {
      out.writeInt(-1);
      return -1;
    }
  }


  /*
   * 直接将从DataInput in里的输入数据流解压缩后,以UTF-8形式解析到String中
   */
  /* Ugly utility, maybe someone else can do this better  */
  public static String readCompressedString(DataInput in) throws IOException {
    byte[] bytes = readCompressedByteArray(in);
    if (bytes == null) return null;
    return new String(bytes, "UTF-8");
  }

  /*
   * 先将String s以UTF-8形式变成bytes,然后压缩写入DataOutput
   */
  public static int  writeCompressedString(DataOutput out, String s) throws IOException {
    return writeCompressedByteArray(out, (s != null) ? s.getBytes("UTF-8") : null);
  }

  /*
   *
   * Write a String as a Network Int n, followed by n Bytes
   * Alternative to 16 bit read/writeUTF.
   * Encoding standard is... ?
   * 
   */
  public static void writeString(DataOutput out, String s) throws IOException {
    if (s != null) {
      byte[] buffer = s.getBytes("UTF-8");
      int len = buffer.length;
      out.writeInt(len);
      out.write(buffer, 0, len);
    } else {
      out.writeInt(-1);
    }
  }

  /*
   * Read a String as a Network Int n, followed by n Bytes
   * Alternative to 16 bit read/writeUTF.
   * Encoding standard is... ?
   *
   */
  public static String readString(DataInput in) throws IOException{
    int length = in.readInt();
    if (length == -1) return null;
    byte[] buffer = new byte[length];
    in.readFully(buffer);      // could/should use readFully(buffer,0,length)?
    return new String(buffer,"UTF-8");  
  }


  /*
   * Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
   * Could be generalised using introspection.
   *
   */
  public static vo