日期:2014-05-16  浏览次数:20382 次

HBase splitlog 过程

上一篇Blog提到了HBase在regionserver挂掉以后,master会处理,其中很重要的一步是就是splitlog,把.logs目录下的该rs的文件夹里的HLog文件,按照region进行分配。splitlog的代码如下所示:

private List<Path> splitLog(final FileStatus[] logfiles) throws IOException {
    List<Path> processedLogs = new ArrayList<Path>();//成功处理以后的文件放入这个目录下
    List<Path> corruptedLogs = new ArrayList<Path>();//读取文件出错的放入这个目录下
    List<Path> splits = null;

    boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", true);

    long totalBytesToSplit = countTotalBytes(logfiles);
    splitSize = 0;

    outputSink.startWriterThreads(entryBuffers);//启动三个写线程,将内存中的数据按照region分别写入region下的recover.edits目录下

    try {
      int i = 0;
      for (FileStatus log : logfiles) {
       Path logPath = log.getPath();
        long logLength = log.getLen();
        splitSize += logLength;
        LOG.debug("Splitting hlog " + (i++ + 1) + " of " + logfiles.length
            + ": " + logPath + ", length=" + logLength);
        Reader in;
        try {
          in = getReader(fs, log, conf, skipErrors);
          if (in != null) {
            parseHLog(in, logPath, entryBuffers, fs, conf, skipErrors);//读取文件写入内存entryBuffers
            try {
              in.close();
            } catch (IOException e) {
              LOG.warn("Close log reader threw exception -- continuing",
                  e);
            }
          }
          processedLogs.add(logPath);
        } catch (CorruptedLogFileException e) {
          LOG.info("Got while parsing hlog " + logPath +
              ". Marking as corrupted", e);
          corruptedLogs.add(logPath);
          continue;
        }
      }
      if (fs.listStatus(srcDir).length > processedLogs.size()
          + corruptedLogs.size()) {
        throw new OrphanHLogAfterSplitException(
            "Discovered orphan hlog after split. Maybe the "
            + "HRegionServer was not dead when we started");
      }
      archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf);//把corrutedLogs里的path放入到.corruptedlogs上,把processedLogs上的path移到oldlog上,并删除HLog
    } finally {
      LOG.info("Finishing writing output logs and closing down.");
      splits = outputSink.finishWritingAndClose();
    }
    return splits;
  }

?ParseHLog过程很简单从文件中读取数据写入到内存中,一次最多128M

  private void parseHLog(final Reader in, Path path,
		EntryBuffers entryBuffers, final FileSystem fs,
    final Configuration conf, boolean skipErrors)
	throws IOException, CorruptedLogFileException {
    int editsCount = 0;
    try {
      Entry entry;
      while ((entry = getNextLogLine(in, path, skipErrors)) != null) {
        entryBuffers.appendEntry(entry);
        editsCount++;
      }
    } catch (InterruptedException ie) {
      IOException t = new InterruptedIOException();
      t.initCause(ie);
      throw t;
    } finally {
      LOG.debug("Pushed=" + editsCount + " entries from " + path);
    }
  }

?写线程也比较简单,每个线程从entryBuffer中获取一个region的一块数据,在一个entrBuffer中,一个region只能由一个线程来handler,不然会有多个写线程同时对一个文件进行操作。

 private void doRun() throws IOException {
      LOG.debug("Writer thread " + this + ": starting");
      while (true) {
        RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
        if (buffer == null) {
          // No data currently available, wait on some more to show up
          synchronized (dataAvailable) {
            if (shouldStop) return;
            try {
              dataAvailable.wait(1000);
            } catch (InterruptedException ie) {
              if (!shouldStop) {
                throw new RuntimeException(ie);
              }
            }
          }
          continue;
        }

        assert buffer != null;
        try {
          writeBuffer(buffer);
        } finally {
          entryBuffers.doneWriting(buffer);
        }
      }
    }

?

?