一、功能图
1.Put CompactionRequest to the compaction queue

?
2.Get CompactionRequest from the compaction queue

?
二、功能详解
1.Put CompactionRequest to the compaction queue
?1.1.HRegionServer
1.run()
1.1.preRegistrationInitialization()
Do pre-registration initializations; zookeeper, lease threads, etc.
1.1.1.initializeThreads()
// Compaction thread
this.compactSplitThread = new CompactSplitThread(this);
// Background thread to check for compactions; needed if region
// has not gotten updates in a while. Make it run at a lesser frequency.
int multiplier = this.conf.getInt(HConstants.THREAD_WAKE_FREQUENCY +
".multiplier", 1000);
this.compactionChecker = new CompactionChecker(this,
this.threadWakeFrequency * multiplier, this);
hbase.server.thread.wakefrequency.multiplier = 1000
hbase.server.thread.wakefrequency = 10 * 1000
?
1.2.CompactionChecker
Inner class that runs on a long period checking?if regions need compaction.
1.CompactionChecker()
sleepTime = 10000s
LOG.info("Runs every " + StringUtils.formatTime(sleepTime));
hbase.regionserver.compactionChecker.majorCompactPriority = Integer.MAX_VALUE
2.chore()
CompactSplitThread.requestCompaction()
?
1.3.CompactSplitThread
1.requestCompaction()
1.1.
CompactionRequest cr = s.requestCompaction(priority, request);
ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize())
? largeCompactions : smallCompactions;
pool.execute(cr);
if (LOG.isDebugEnabled()) {
String type = (pool == smallCompactions) ? "Small " : "Large ";
LOG.debug(type + "Compaction requested: " + cr
+ (why != null && !why.isEmpty() ? "; Because: " + why : "")
+ "; " + this);
}
1.2.Store.requestCompaction()
1.3.Store.throttleCompaction()
?
1.4.Store
1.requestCompaction()
1.1.
// candidates = all storefiles not already in compaction queue
CompactSelection filesToCompact;
filesToCompact = compactSelection(candidates, priority);
1.2.compactSelection()
Algorithm to choose which files to compact
参考资料:
? ? ? ?compaction配置参数
? ? ? ?Hbase选择Store file做compaction的算法
// major compaction if all StoreFiles are included
boolean isMajor = (filesToCompact.getFilesToCompact().size() == this.storefiles.size());
if (isMajor) {
// since we're enqueuing a major, update the compaction wait interval
this.forceMajor = false;
}
1.3.isMajor
2.throttleCompaction()
boolean throttleCompaction(long compactionSize) {
long throttlePoint = conf.getLong(
"hbase.regionserver.thread.compaction.throttle",
2 * this.minFilesToCompact * this.region.memstoreFlushSize);
return compactionSize > throttlePoint;
}
2.Get CompactionRequest from the compaction queue
2.1.CompactionRequest
1.run()
1.1.
long start = EnvironmentEdgeManager.currentTimeMillis();
boolean completed = r.compact(this);
long now = 