ZooKeeper官方Java演示实例初体验

2018-05-05 03:09:39 +0000

本文分为下面3部分,可以完整的初体验Zookeeper的应用和学习官方推荐的编程方式。后面再分享ZooKeeper的多个实际应用。

一、ZooKeeper的安装;

二、ZooKeeper的官方实例运行演示,在window上启动计算器程序。

三、ZooKeeper的官方实例,此部分为翻译,大家将就看吧。

 

第一部分:安装ZooKeeper服务器

    1、打开ZooKeeper官网,在目录Getting Started/Download下载免安装包 zookeeper-3.4.12.tar.gz ;

    2、解压缩免安装包,例如 d:/zk/zookeeper-3.4.12;

    3、进入conf目录将zoo_sample.cfg文件名修改为zoo.cfg,并确保如下参数被配置;

 

     4、进入bin目录双击 zkServer.cmd 启动 ZooKeeper 服务,看到binding to port 0.0.0.0/0.0.0.0:2181表示启动成功;(前提是你的机器上安装/配置了JRE)

 

     5、进入bin目录双击 zkCli.cmd 打开一个客户端连接,我创建了一个zk节点,并修改它的值,见下图;

         >ls /

         >create /zk 333

         >set /zk 555

       更多命令行操作

  • 1. 显示根目录下、文件: ls / 使用 ls 命令来查看当前 ZooKeeper 中所包含的内容
  • 2. 显示根目录下、文件: ls2 / 查看当前节点数据并能看到更新次数等数据
  • 3. 创建文件,并设置初始内容: create /zk "test" 创建一个新的 znode节点“ zk ”以及与它关联的字符串
  • 4. 获取文件内容: get /zk 确认 znode 是否包含我们所创建的字符串
  • 5. 修改文件内容: set /zk "zkbak" 对 zk 所关联的字符串进行设置
  • 6. 删除文件: delete /zk 将刚才创建的 znode 删除
  • 7. 退出客户端: quit
  • 8. 帮助命令: help

           

 

 

第二部分:ZooKeeper的官方实例演示

    1、下载 Executor.java 和 DataMonitor.java,为方便演示就在eclipse project中运行。

    2、给添加eclipse project依赖包。在下面目录找到这4个jar添加到eclipse project 依赖中。

        D:\zk\zookeeper-3.4.12\
                         zookeeper-3.4.12.jar

        D:\zk\zookeeper-3.4.12\lib\
                         log4j-1.2.17.jar
                         slf4j-api-1.7.25.jar
                         slf4j-log4j12-1.7.25.jar

   3、配置Run Configurations,按照下图配置启动参数。

 

 

    4、点击run按钮,运行Executor。我们可以看到在window上回启动计算器程序calc。

 

      5、试试在 更新节点的值,发现程序会新启动一个计算器应用。
           >set /zk 555

      

      6、是不是很神奇,更详细的去第三部分看ZooKeeper的工作原理和如何编程。

 

第三部分: ZooKeeper的官方实例讲解

一个简单的客户端监听程序

       我们开发了一个非常简单的客户端监听程序,向你介绍ZooKeeper的Java API。这个ZooKeeper客户端观察ZooKeeper节点的变化并通过启动或停止程序来响应变化事件。

 

需求

       有四个需求:

  1.        需要参数
                ZooKeeper服务器的IP
                需要绑定观察的znode名称
                可执行应用的名称和需要的参数
  2.         它获取与znode节点绑定的数据启动可执行应用。
  3.          如果znode节点发生变化,客户端重新获取数据,再次启动可执行应用。
  4.          如果znode节点影藏,客户端中止可执行应用。

 

程序设计

        通常,ZooKeeper应用程序分为两个单元,一个维护连接,另一个观察数据。 在这个应用程序中,名为Executor的类维护ZooKeeper连接,并且称为DataMonitor的类观察ZooKeeper树中的数据。 在这个应用程序中,名为Executor的类维护ZooKeeper连接,并且称为DataMonitor的类观察ZooKeeper树中的数据。 此外,Executor包含主线程并包含执行逻辑。 它负责少量的用户交互,就是你传入参数与可执行程序交互,以及简单的根据返回的节点的状态(根据需求)关闭并重新启动。

 

类:Executor.java

Executor对象是示例应用程序的主要容器。 它包含ZooKeeper对象,DataMonitor,如上面在程序设计中所述。

 // from the Executor class...
    
    public static void main(String[] args) {
        if (args.length < 4) {
            System.err
                    .println("USAGE: Executor hostPort znode filename program [args ...]");
            System.exit(2);
        }
        String hostPort = args[0];
        String znode = args[1];
        String filename = args[2];
        String exec[] = new String[args.length - 3];
        System.arraycopy(args, 3, exec, 0, exec.length);
        try {
            new Executor(hostPort, znode, filename, exec).run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public Executor(String hostPort, String znode, String filename,
            String exec[]) throws KeeperException, IOException {
        this.filename = filename;
        this.exec = exec;
        zk = new ZooKeeper(hostPort, 3000, this);
        dm = new DataMonitor(zk, znode, null, this);
    }

    public void run() {
        try {
            synchronized (this) {
                while (!dm.dead) {
                    wait();
                }
            }
        } catch (InterruptedException e) {
        }
    }

来源: http://zookeeper.apache.org/doc/current/javaExample.html#sc_design

 

       回想下Executor的工作是启动和停止执行你在main函数中传入程序(名称)。Executor的工作,就是响应由ZooKeeper对象触发的事件。正如你在上面的代码中看到的那样,Executor将自己的引用作为Watcher参数传递到ZooKeeper构造函数中。它还将自己的引用作为DataMonitorListener参数的引用传递给DataMonitor构造函数。根据Executor的定义,它实现了这两个接口:

public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener {
...

       Watcher接口是ZooKeeper Java API中定义的。ZooKeeper使用Watcher来回传给它的容器。它只支持一个方法process(),ZooKeeper用它来交互主线程可能感兴趣的事件,例如ZooKeeper连接或会话的状态事件。在这个例子中Executor只是简单的将事件传递给DataMonitor来决定如何处理它们。为了更简单的说明这一点,按照惯例,Executor或类似Executor的对象“拥有”ZooKeeper连接,但可以将事件委托给其他对象的其他事件。这种编程方式被视为来触发Watch事件的默认通道。(关于这个后面再说明)

    public void process(WatchedEvent event) {
        dm.process(event);
    }

      另一方面,DataMonitorListener 接口不是ZooKeeper API的一部分。它完全是一个自定义接口,定义在这个简单的应用中。 DataMonitor对象使用它来回传给它的容器,它实际就是Executor对象。DataMonitorListener接口如下所示:

public interface DataMonitorListener {
    /**
    * The existence status of the node has changed.
    */
    void exists(byte data[]);

    /**
    * The ZooKeeper session is no longer valid.
    * 
    * @param rc
    * the ZooKeeper reason code
    */
    void closing(int rc);
}

 

        这接口定义在类DataMonitor中,且被类Excutor实现。当Executor.exists()被调用时,Excutor决定是否按照要求来启动或关闭可执行应用。回想下需求说明当znode节点不存在就关闭可执行应用。

        当调用Executor.closing()时,Executor决定是否自行关闭,以响应ZooKeeper连接永久消失。

        正如您可能已经猜到的那样,DataMonitor是调用这些方法的对象,以响应ZooKeeper状态的更改。

        以下是DataMonitorListener.exists()和DataMonitorListener.closing的Executor实现:

public void exists( byte[] data ) {
    if (data == null) {
        if (child != null) {
            System.out.println("Killing process");
            child.destroy();
            try {
                child.waitFor();
            } catch (InterruptedException e) {
            }
        }
        child = null;
    } else {
        if (child != null) {
            System.out.println("Stopping child");
            child.destroy();
            try {
               child.waitFor();
            } catch (InterruptedException e) {
            e.printStackTrace();
            }
        }
        try {
            FileOutputStream fos = new FileOutputStream(filename);
            fos.write(data);
            fos.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            System.out.println("Starting child");
            child = Runtime.getRuntime().exec(exec);
            new StreamWriter(child.getInputStream(), System.out);
            new StreamWriter(child.getErrorStream(), System.err);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

public void closing(int rc) {
    synchronized (this) {
        notifyAll();
    }
}

 

类:DataMonitor.java

        DataMonitor类具有ZooKeeper逻辑的优点。 它主要是异步和事件驱动的。 DataMonitor在构造函数中使用以下方法启动:

public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
        DataMonitorListener listener) {
    this.zk = zk;
    this.znode = znode;
    this.chainedWatcher = chainedWatcher;
    this.listener = listener;
    
    // Get things started by checking if the node exists. We are going
    // to be completely event driven
    zk.exists(znode, true, this, null);
}

       对ZooKeeper.exists()的调用,是检查节点是否存在,并设置一个观察,传递自身引用作为回调对象。从这个意义上讲,它会使事情发生,因为真正的处理是发生在watch观察被触发的时候。

       注意:

  •        不要将结束回调与观察回调混淆。ZooKeeper.exists()结束回调,是在DataMonitor对象实现接口StatCallback.processResult()方法被调用结束之后,当异步的观察操作在ZooKeeper服务器完成时DataMonitor.processResult()将被回调。
  •         另一方面,观察触发会传递一个事件给Executor对象,因为Executor将自身注册为ZooKeeper对象的Watcher。
  •         另外,您可能会注意到DataMonitor也可以将自己注册为此特定观察事件的观察器。 这是ZooKeeper 3.0.0的新功能(支持多个Watcher)。 然而在这个例子中,DataMonitor并没有注册为Watcher。

         

         当ZooKeeper.exists()操作在服务器上完成时,ZooKeeper API会在客户端调用processResult结束回调:

public void processResult(int rc, String path, Object ctx, Stat stat) {
    boolean exists;
    switch (rc) {
    case Code.Ok:
        exists = true;
        break;
    case Code.NoNode:
        exists = false;
        break;
    case Code.SessionExpired:
    case Code.NoAuth:
        dead = true;
        listener.closing(rc);
        return;
    default:
        // Retry errors
        zk.exists(znode, true, this, null);
        return;
    }
 
    byte b[] = null;
    if (exists) {
        try {
            b = zk.getData(znode, false, null);
        } catch (KeeperException e) {
            // We don't need to worry about recovering now. The watch
            // callbacks will kick off any exception handling
            e.printStackTrace();
        } catch (InterruptedException e) {
            return;
        }
    }     
    if ((b == null && b != prevData)
            || (b != null && !Arrays.equals(prevData, b))) {
        listener.exists(b);
        prevData = b;
    }
}

 

         代码首先检查错误代码是否存在znode,致命错误和可恢复错误。 如果文件(或znode)存在,它将从znode获取数据,然后在状态发生变化时调用Executor的exists()回调。请注意,它不必为getData调用处理任何异常,因为它观察了任何可能导致错误的事件:如果znode节点在调用方法ZooKeeper.getData()前被删除,ZooKeeper.exists()设置的观察事件会触发一个回调。如果发生连接错误,当连接恢复时,连接观察事件会被触发。

          最后,注意DataMonitor如何处理观察事件:

 public void process(WatchedEvent event) {
        String path = event.getPath();
        if (event.getType() == Event.EventType.None) {
            // We are are being told that the state of the
            // connection has changed
            switch (event.getState()) {
            case SyncConnected:
                // In this particular example we don't need to do anything
                // here - watches are automatically re-registered with 
                // server and any watches triggered while the client was 
                // disconnected will be delivered (in order of course)
                break;
            case Expired:
                // It's all over
                dead = true;
                listener.closing(KeeperException.Code.SessionExpired);
                break;
            }
        } else {
            if (path != null && path.equals(znode)) {
                // Something has changed on the node, let's find out
                zk.exists(znode, true, this, null);
            }
        }
        if (chainedWatcher != null) {
            chainedWatcher.process(event);
        }
    }

        如果客户端ZooKeeper库可以在会话过期(失效事件)之前重新建立到ZooKeeper的通信通道(SyncConnected事件),则会话的所有观察都将自动与服务器重新建立连接(观察自动重置在ZooKeeper 3.0.0版本中是新的功能)。有关更多信息,请参阅程序员指南中的ZooKeeper Watches。在这个函数中,当DataMonitor获得一个znode的事件时,它会调用ZooKeeper.exists()来找出发生了什么变化。