日期:2014-05-16  浏览次数:21284 次

Apache curator-recipes代码实例

? ? Apache curator-recipes组件提供了大量已经"生产化"(produced)的特性,极大的简化了使用zk的复杂度.

? ? 1. Cache: 提供了对一个Node持续监听,如果节点数据变更,即可立即得到响应. 开发者无需过度的关注watcher和Event操作.

? ? 2. Queues: 提供了重量级的分布式队列解决方案,比如:权重队列,可延迟队列等.其实Zookeeper并不适合作为数据存储系统,你可以适度的使用它来达成分布式队列的设计要求.

? ? 3. Counters: 全局计数器是分布式设计中很常用的,包括"全局计算器"和"原子自增计数器".

? ? 4. Locks: 分布式锁的设计有很多手段,此组件提供了分布式"读写分写锁"/"共享锁"等.在我们需要控制资源访问的情况下,非常有用.

? ? 5. Barries: 栅栏,需要对分布式环境中,多个操作(进程)进行同步或者协同时,可以考虑使用barries;

? ? 6. Elections: 选举;可以在多个"注册者"之间选举出leader,作为操作调度/任务监控/队列消费的执行者,我们在设计"leader角色选举"/"单点任务执行""分布式队列消费者"等场景时,非常有效.

?

? ? 代码实例

1. 创建Client

CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
//fluent style
String namespace = "cluster-worker";
CuratorFramework client = builder.connectString("127.0.0.1:2181")
		.sessionTimeoutMs(30000)
		.connectionTimeoutMs(30000)
		.canBeReadOnly(false)
		.retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
		.namespace(namespace)
		.defaultData(null)
		.build();
client.start();
EnsurePath ensure = client.newNamespaceAwareEnsurePath(namespace);
//code for test
Thread.sleep(5000);
//client.close()//

?

2. Caches

? ? 持续watcher节点,并将节点的数据变更即时的在本地反应出来.recpise提供了PathChildrenCache和NodeCache两个API.

public static PathChildrenCache pathChildrenCache(CuratorFramework client, String path, Boolean cacheData) throws Exception {
	final PathChildrenCache cached = new PathChildrenCache(client, path, cacheData);
	cached.getListenable().addListener(new PathChildrenCacheListener() {
		@Override
		public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
			PathChildrenCacheEvent.Type eventType = event.getType();
			switch (eventType) {
				case CONNECTION_RECONNECTED:
					cached.rebuild();
					break;
				case CONNECTION_SUSPENDED:
				case CONNECTION_LOST:
					System.out.println("Connection error,waiting...");
					break;
				default:
					System.out.println("Data:" + event.getData().toString());
			}
		}
	});
	return cached;
}

?

PathChildrenCache cached = pathChildrenCache(client,path,true);
//= start() + rebuild()
//事件操作,将会在额外的线程中执行.
cached.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
List<ChildData> childData = cached.getCurrentData();
if (childData != null) {
	for (ChildData data : childData) {
		System.out.println("Path:" + data.getPath() + ",data" + new String(data.getData(), "utf-8"));
	}
}

//当不在需要关注此节点数据时,需要及时的关闭它.
//因为每个cached,都会额外的消耗一个线程.
//cached.close();////close the watcher,clear the cached Data

? ? 对于PathChildrenCache.getCurrentData()将从获取本地的数据列表,而不是触发一次zookeeper.getChildren(),因此为"Cache".

3. Queues:分布式队列

? ? 分布式队列的基本特性,就是"生产者"或"消费者"跨越多个进程,且在此种环境中需要确保队列的push/poll的有序性.

? ? zookeeper本身并没有提供分布式队列的实现,只是recipse根据zookeeper的watcher和具有version标记的node,来间接的实现分布式queue..内部机制如下:

? ? --> 如果是消费者(QueueConsumer),会创建一个类似于PathChildrenCache的实例用于监听queuePath下的子节点变更事件(单独的线程中).同时consumer处于阻塞状态,当有子节点变更事件时会被唤醒(包括创建子节点/删除子节点等);

? ? --> 此时consumer获取子节点列表,并将每个节点信息封装成Runnable任务单元,提交到线程池中.?