日期:2014-05-16  浏览次数:20448 次

HBase disable table的过程

hbase的table delete之前需要将该table disable,今天结合源码分析一下disable的过程

?

首先看客户端HbaseAdmin.java中有接口

public void disableTable(final byte [] tableName)
public void disableTable(final String tableName)

?它们的实现都是

? public void disableTableAsync(final byte [] tableName) throws IOException {
??? isMasterRunning();
??? try {
????? getMaster().disableTable(tableName);
??? } catch (RemoteException e) {
????? throw e.unwrapRemoteException();
??? }
??? LOG.info("Started disable of " + Bytes.toString(tableName));
? }

这段代码的主要过程是通过rpc远程调用执行HMaster的disableTable();

?所以我直接看HMaster的disableTable函数

public void disableTable(final byte [] tableName) throws IOException {
    this.executorService.submit(new DisableTableHandler(this, tableName,
      catalogTracker, assignmentManager));
  }

?起了一个新线程DisableTableHadnler

首先判断meta表中是否存在该table,扫描meta表,若meta表不存在table则返回,抛出异常tablenotexist

该线程主要调用

private void handleDisableTable()

?将zk上面的/table/?tablename标识为DISABLING

 this.assignmentManager.getZKTable().setDisablingTable(this.tableNameStr);
boolean done = false;
    while (true) {
      // Get list of online regions that are of this table.  Regions that are
      // already closed will not be included in this list; i.e. the returned
      // list is not ALL regions in a table, its all online regions according to
      // the in-memory state on this master.
      final List<HRegionInfo> regions =
        this.assignmentManager.getRegionsOfTable(tableName);
      if (regions.size() == 0) {
        done = true;
        break;
      }
      LOG.info("Offlining " + regions.size() + " regions.");
      BulkDisabler bd = new BulkDisabler(this.server, regions);
      try {
        if (bd.bulkAssign()) {
          done = true;
          break;
        }
      } catch (InterruptedException e) {
        LOG.warn("Disable was interrupted");
        // Preserve the interrupt.
        Thread.currentThread().interrupt();
        break;
      }
    }
    // Flip the table to disabled if success.
    if (done) this.assignmentManager.getZKTable().setDisabledTable(this.tableNameStr);

?获取该table上的所有online的region集合。

对于这些regions? bulkassign:起20个线程对所有region进行unassigned

protected void populatePool(ExecutorService pool) {
      for (HRegionInfo region: regions) {
        if (assignmentManager.isRegionInTransition(region) != null) continue;
        final HRegionInfo hri = region;
        pool.execute(new Runnable() {
          public void run() {
            assignmentManager.unassign(hri);
          }
        });
      }
    }

?如上所示如果该Region已经在master的RIT队列中,则说明该region正在被处理,则忽略之

下面我们来看unassigned过程:

1.? 该region必须online,在master的online队列中存在

2.? 若该region在master的RIT队列中,且其state的状态是PENDING且force=true即表示即使该region正在closing ,force参数指定该region should be closed;

???? 若在RIT中且不符合上面条件则忽略返回;

???? 若不存在则将当前Region加入RIT队列中

3.? serverManager.sendRegionClose(server, state.getRegion()),Master端与该Region所在RS进行通信,RPC调用HRegionServer的closeRegion(region)函数

?

HRegionServer端

closeRegion(region)

1. 该region在RS的online regions列表上

2. 起一个线程CloseRegionhandler实现以下步骤

??? 1)setClosingState,在zk上创建unassigned下的该region节点将其值置为RS_ZK_REGION_CLOSING

??? 2)region.close(abort),关闭region,主要是关闭region下每个Store的每个HFile的reader

??? 3)将region从当前online列表中移除

??? 4)在zk上将该节点置为RS_ZK_REGION_CLOSED

????5)从RS的RIT队列中移除该region

Server端的主要逻辑即为如上所示

?

在上面过程中由于对zk上的unassigned节点进行了created和changed,且master端watch了该节点,当这些节点发生变化时会使得Master端也发生变化。

1)created时

  public void nodeCreated(String path) {
    if(p