一、功能图
1.Put CompactionRequest to the compaction queue
?
2.Get CompactionRequest from the compaction queue
?
二、功能详解
1.Put CompactionRequest to the compaction queue
?1.1.HRegionServer
1.run()
1.1.preRegistrationInitialization()
Do pre-registration initializations; zookeeper, lease threads, etc.
1.1.1.initializeThreads()
// Compaction thread this.compactSplitThread = new CompactSplitThread(this); // Background thread to check for compactions; needed if region // has not gotten updates in a while. Make it run at a lesser frequency. int multiplier = this.conf.getInt(HConstants.THREAD_WAKE_FREQUENCY + ".multiplier", 1000); this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency * multiplier, this);
hbase.server.thread.wakefrequency.multiplier = 1000
hbase.server.thread.wakefrequency = 10 * 1000
?
1.2.CompactionChecker
Inner class that runs on a long period checking?if regions need compaction.
1.CompactionChecker()
sleepTime = 10000s LOG.info("Runs every " + StringUtils.formatTime(sleepTime));
hbase.regionserver.compactionChecker.majorCompactPriority = Integer.MAX_VALUE
2.chore()
CompactSplitThread.requestCompaction()
?
1.3.CompactSplitThread
1.requestCompaction()
1.1.
CompactionRequest cr = s.requestCompaction(priority, request); ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize()) ? largeCompactions : smallCompactions; pool.execute(cr); if (LOG.isDebugEnabled()) { String type = (pool == smallCompactions) ? "Small " : "Large "; LOG.debug(type + "Compaction requested: " + cr + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); }
1.2.Store.requestCompaction()
1.3.Store.throttleCompaction()
?
1.4.Store
1.requestCompaction()
1.1.
// candidates = all storefiles not already in compaction queue CompactSelection filesToCompact; filesToCompact = compactSelection(candidates, priority);
1.2.compactSelection()
Algorithm to choose which files to compact
参考资料:
? ? ? ?compaction配置参数
? ? ? ?Hbase选择Store file做compaction的算法
// major compaction if all StoreFiles are included boolean isMajor = (filesToCompact.getFilesToCompact().size() == this.storefiles.size()); if (isMajor) { // since we're enqueuing a major, update the compaction wait interval this.forceMajor = false; }
1.3.isMajor
2.throttleCompaction()
boolean throttleCompaction(long compactionSize) { long throttlePoint = conf.getLong( "hbase.regionserver.thread.compaction.throttle", 2 * this.minFilesToCompact * this.region.memstoreFlushSize); return compactionSize > throttlePoint; }
2.Get CompactionRequest from the compaction queue
2.1.CompactionRequest
1.run()
1.1.
long start = EnvironmentEdgeManager.currentTimeMillis(); boolean completed = r.compact(this); long now =