日期:2014-05-16 浏览次数:20396 次
private List<Path> splitLog(final FileStatus[] logfiles) throws IOException { List<Path> processedLogs = new ArrayList<Path>();//成功处理以后的文件放入这个目录下 List<Path> corruptedLogs = new ArrayList<Path>();//读取文件出错的放入这个目录下 List<Path> splits = null; boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", true); long totalBytesToSplit = countTotalBytes(logfiles); splitSize = 0; outputSink.startWriterThreads(entryBuffers);//启动三个写线程,将内存中的数据按照region分别写入region下的recover.edits目录下 try { int i = 0; for (FileStatus log : logfiles) { Path logPath = log.getPath(); long logLength = log.getLen(); splitSize += logLength; LOG.debug("Splitting hlog " + (i++ + 1) + " of " + logfiles.length + ": " + logPath + ", length=" + logLength); Reader in; try { in = getReader(fs, log, conf, skipErrors); if (in != null) { parseHLog(in, logPath, entryBuffers, fs, conf, skipErrors);//读取文件写入内存entryBuffers try { in.close(); } catch (IOException e) { LOG.warn("Close log reader threw exception -- continuing", e); } } processedLogs.add(logPath); } catch (CorruptedLogFileException e) { LOG.info("Got while parsing hlog " + logPath + ". Marking as corrupted", e); corruptedLogs.add(logPath); continue; } } if (fs.listStatus(srcDir).length > processedLogs.size() + corruptedLogs.size()) { throw new OrphanHLogAfterSplitException( "Discovered orphan hlog after split. Maybe the " + "HRegionServer was not dead when we started"); } archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf);//把corrutedLogs里的path放入到.corruptedlogs上,把processedLogs上的path移到oldlog上,并删除HLog } finally { LOG.info("Finishing writing output logs and closing down."); splits = outputSink.finishWritingAndClose(); } return splits; }
private void parseHLog(final Reader in, Path path, EntryBuffers entryBuffers, final FileSystem fs, final Configuration conf, boolean skipErrors) throws IOException, CorruptedLogFileException { int editsCount = 0; try { Entry entry; while ((entry = getNextLogLine(in, path, skipErrors)) != null) { entryBuffers.appendEntry(entry); editsCount++; } } catch (InterruptedException ie) { IOException t = new InterruptedIOException(); t.initCause(ie); throw t; } finally { LOG.debug("Pushed=" + editsCount + " entries from " + path); } }
private void doRun() throws IOException { LOG.debug("Writer thread " + this + ": starting"); while (true) { RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); if (buffer == null) { // No data currently available, wait on some more to show up synchronized (dataAvailable) { if (shouldStop) return; try { dataAvailable.wait(1000); } catch (InterruptedException ie) { if (!shouldStop) { throw new RuntimeException(ie); } } } continue; } assert buffer != null; try { writeBuffer(buffer); } finally { entryBuffers.doneWriting(buffer); } } }