? ? Apache curator-recipes组件提供了大量已经"生产化"(produced)的特性,极大的简化了使用zk的复杂度.
? ? 1. Cache: 提供了对一个Node持续监听,如果节点数据变更,即可立即得到响应. 开发者无需过度的关注watcher和Event操作.
? ? 2. Queues: 提供了重量级的分布式队列解决方案,比如:权重队列,可延迟队列等.其实Zookeeper并不适合作为数据存储系统,你可以适度的使用它来达成分布式队列的设计要求.
? ? 3. Counters: 全局计数器是分布式设计中很常用的,包括"全局计算器"和"原子自增计数器".
? ? 4. Locks: 分布式锁的设计有很多手段,此组件提供了分布式"读写分写锁"/"共享锁"等.在我们需要控制资源访问的情况下,非常有用.
? ? 5. Barries: 栅栏,需要对分布式环境中,多个操作(进程)进行同步或者协同时,可以考虑使用barries;
? ? 6. Elections: 选举;可以在多个"注册者"之间选举出leader,作为操作调度/任务监控/队列消费的执行者,我们在设计"leader角色选举"/"单点任务执行""分布式队列消费者"等场景时,非常有效.
?
? ? 代码实例
1. 创建Client
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); //fluent style String namespace = "cluster-worker"; CuratorFramework client = builder.connectString("127.0.0.1:2181") .sessionTimeoutMs(30000) .connectionTimeoutMs(30000) .canBeReadOnly(false) .retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE)) .namespace(namespace) .defaultData(null) .build(); client.start(); EnsurePath ensure = client.newNamespaceAwareEnsurePath(namespace); //code for test Thread.sleep(5000); //client.close()//
?
2. Caches
? ? 持续watcher节点,并将节点的数据变更即时的在本地反应出来.recpise提供了PathChildrenCache和NodeCache两个API.
public static PathChildrenCache pathChildrenCache(CuratorFramework client, String path, Boolean cacheData) throws Exception { final PathChildrenCache cached = new PathChildrenCache(client, path, cacheData); cached.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { PathChildrenCacheEvent.Type eventType = event.getType(); switch (eventType) { case CONNECTION_RECONNECTED: cached.rebuild(); break; case CONNECTION_SUSPENDED: case CONNECTION_LOST: System.out.println("Connection error,waiting..."); break; default: System.out.println("Data:" + event.getData().toString()); } } }); return cached; }
?
PathChildrenCache cached = pathChildrenCache(client,path,true); //= start() + rebuild() //事件操作,将会在额外的线程中执行. cached.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); List<ChildData> childData = cached.getCurrentData(); if (childData != null) { for (ChildData data : childData) { System.out.println("Path:" + data.getPath() + ",data" + new String(data.getData(), "utf-8")); } } //当不在需要关注此节点数据时,需要及时的关闭它. //因为每个cached,都会额外的消耗一个线程. //cached.close();////close the watcher,clear the cached Data
? ? 对于PathChildrenCache.getCurrentData()将从获取本地的数据列表,而不是触发一次zookeeper.getChildren(),因此为"Cache".
3. Queues:分布式队列
? ? 分布式队列的基本特性,就是"生产者"或"消费者"跨越多个进程,且在此种环境中需要确保队列的push/poll的有序性.
? ? zookeeper本身并没有提供分布式队列的实现,只是recipse根据zookeeper的watcher和具有version标记的node,来间接的实现分布式queue..内部机制如下:
? ? --> 如果是消费者(QueueConsumer),会创建一个类似于PathChildrenCache的实例用于监听queuePath下的子节点变更事件(单独的线程中).同时consumer处于阻塞状态,当有子节点变更事件时会被唤醒(包括创建子节点/删除子节点等);
? ? --> 此时consumer获取子节点列表,并将每个节点信息封装成Runnable任务单元,提交到线程池中.?