日期:2014-05-16  浏览次数:20978 次

【Apache ZooKeeper】为ZNode设置watcher

       众所周知,ZooKeeper中的ZNode是树形结构,现在我需要给/app1结点设置watcher,监听/app1下增减、删除和修改的结点,并将相应的事件使用log4j记录到日志文件中。ZNode的变化可以直接通过event.getType来获取。使用zk.exists(PATH, wc);来为PATH结点设置watcher,所有结点都可以使用wc做watcher。 

       代码如下:

package com.iflytek.cpcloud.zookeeper;

import java.io.IOException;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.PropertyConfigurator;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;

public class WatchClient implements Runnable {

	private static final Log LOG = LogFactory.getLog(WatchClient.class);
	public static final int CLIENT_PORT = 2181;
	public static final String PATH = "/app1";// 所要监控的结点
	private static ZooKeeper zk;
	private static List<String> nodeList;// 所要监控的结点的子结点列表

	public static void main(String[] args) throws Exception {
		PropertyConfigurator.configure("F:\\test\\conf\\log4j.properties");
		WatchClient client = new WatchClient();
		Thread th = new Thread(client);
		th.start();
	}

	public WatchClient() throws IOException {

		zk = new ZooKeeper("192.168.255.133:" + CLIENT_PORT, 21810,
				new Watcher() {
					public void process(WatchedEvent event) {
					}
				});
	}

	/**
	 * 设置watch的线程
	 */
	@Override
	public void run() {
		
		Watcher wc = new Watcher() {
			@Override
			public void process(WatchedEvent event) {
				// 结点数据改变之前的结点列表
				List<String> nodeListBefore = nodeList;
				// 主结点的数据发生改变时
				if (event.getType() == EventType.NodeDataChanged) {
					LOG.info("Node data changed:" + event.getPath());
				}
				if (event.getType() == EventType.NodeDeleted){
					LOG.info("Node deleted:" + event.getPath());
				}
				if(event.getType()== EventType.NodeCreated){
					LOG.info("Node created:"+event.getPath());
				}

				// 获取更新后的nodelist
				try {
					nodeList = zk.getChildren(event.getPath(), false);
				} catch (KeeperException e) {
					System.out.println(event.getPath()+" has no child, deleted.");
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				List<String> nodeListNow = nodeList;
				// 增加结点
				if (nodeListBefore.size() < nodeListNow.size()) {
					for (String str : nodeListNow) {
						if (!nodeListBefore.contains(str)) {
							LOG.info("Node created:" + event.getPath() + "/" + str);
						}
					}
				}
			}
		};

		/**
		 * 持续监控PATH下的结点
		 */
		while (true) {
			try {
				zk.exists(PATH, wc);//所要监控的主结点
			} catch (KeeperException | InterruptedException e) {
				e.printStackTrace();
			}
			try {
				nodeList = zk.getChildren(PATH, wc);
			} catch (KeeperException | InterruptedException e) {
				e.printStackTrace();
			}
			// 对PATH下的每个结点都设置一个watcher

			for (String nodeName : nodeList) {
				try {
					zk.exists(PATH + "/" + nodeName, wc);
				} catch (KeeperException | InterruptedException e) {
					e.printStackTrace();
				}
			}
			
			try {
				Thread.sleep(3000);// sleep一会,减少CUP占用率
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

该项目使用maven构建。pom.xml文件如下:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>com.iflytek.cpcloud</groupId>
  <artifactId>zookeeper-test</artifactId>
  <version>0.1.0-SNAPSHOT</ve