日期:2014-05-16  浏览次数:20514 次

Hbase 源码分析4 - Get 流程及rpc原理

分析版本为hbase 0.94

附上趋势团队画的图:

rpc角色表:

HBase通信信道 HBase的通信接口
客户端 服务端
HBase Client Master Server HMasterInterface
HBase Client Region Server HRegionInterface
Region Server Master Server HMasterRegionInterface

 

客户端发起请求:

htable.get(Get)

 

public Result get(final Get get) throws IOException {
return new ServerCallable<Result>(connection, tableName, get.getRow(), operationTimeout) {
public Result call() throws IOException {
return server.get(location.getRegionInfo().getRegionName(), get);
}
}.withRetries();
}

 

调用get方法后,客户端进入睡眠,睡眠时间为pause * HConstants.RETRY_BACKOFF[ntries];  

pause= HBASE_CLIENT_PAUSE(1秒)

RETRY_BACKOFF[] = { 1, 1, 1, 2, 2, 4, 4, 8, 16, 32 };

有结果则中断执行返回rpc结果,否则重试十次(默认DEFAULT_HBASE_CLIENT_RETRIES_NUMBER=10)

通过HConnectionManager的getHRegionConnection方法获取连接

通过HRegionServer的get方法获取结果

 

服务器端:

当regionserver 收到来自客户端的Get请求时,调用接口 

public Result get(byte[] regionName, Get get) 
{ 
... 
HRegion region = getRegion(regionName); 
return region.get(get, getLockFromId(get.getLockId())); 
... 
} 



在HRegion中

Scan scan = new Scan(get);
会先根据设置的columnFamily存放familyMap对  ----  columnFamily:null

public Get addFamily(byte [] family) {
familyMap.remove(family);
familyMap.put(family, null);
return this;
}

 

如果查询的family不在htableDescriptor中,返回错误

scanner = getScanner(scan);
public RegionScanner getScanner(Scan scan) throws IOException {
return getScanner(scan, null);
}

 

additionalScanners为null 所以在RegionScannerImpl的构造中只会使用StoreScanner


return instantiateRegionScanner(scan, additionalScanners);
return new RegionScannerImpl(scan, additionalScanners);

RegionScannerImpl 是 HRegion中的子类

 

for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
StoreScanner scanner = store.getScanner(scan, entry.getValue());
scanners.add(scanner);
}

 

按照familyMap的数量存放对应数量的 StoreScanner

Hregion initialize时会对应每个columnFamily存放一个stores
Future<Store> future = completionService.take();
Store store = future.get();
this.stores.put(store.getColumnFamilyName().getBytes(), store);

scanners 添加从Store中获取的scanner

store.getScanners(cacheBlocks, isGet,
isCompaction, matcher)

Store 类:

memStoreScanners = this.memstore.getScanners();
List<StoreFileScanner> sfScanners = StoreFileScanner
.getScannersForStoreFiles(storeFiles, cacheBlocks, isGet, isCompaction, matcher);
List<KeyValueScanner> scanners =
new ArrayList<KeyValueS