zookeeper java api 操作(二)| 异步处理结果

x33g5p2x  于2021-12-20 转载在 其他  
字(14.9k)|赞(0)|评价(0)|浏览(341)

一、概述

     上一篇文章提到的可以通过java api操作zookeeper数据节点,并对节点状态进行监听

      https://blog.csdn.net/qq_33513250/article/details/102289689

      下面介绍操作znode并异步处理结果:

           创建znode、获取数据、设置数据、获取子节点、权限设置、节点存在判断、删除节点

二、异步处理回调

** 1.创建节点**

package com.szwn.zk;

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

import java.util.concurrent.CountDownLatch;

public class AsyncCreateNote implements Watcher {
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        // 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
        ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new AsyncCreateNote());
        System.out.println(zookeeper.getState());
        // 2.等待连接完成
        connectedSemaphore.await();

        // 3.异步创建节点,节点类型为ephemeral,并设置ctx值
        zookeeper.create("/zk-test-ephemeral-", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
                new IStringCallback(), "I am context. ");

        // 4.异步创建节点,节点类型为ephemeral,并设置ctx值,相同路径报错
        zookeeper.create("/zk-test-ephemeral-", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
                new IStringCallback(), "I am context. ");

        // 5.异步创建节点,节点类型为ephemeral_sequential,并设置ctx值
        zookeeper.create("/zk-test-ephemeral-", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
                new IStringCallback(), "I am context. ");
        Thread.sleep(Integer.MAX_VALUE);
    }

    public void process(WatchedEvent event) {
        if (Event.KeeperState.SyncConnected == event.getState()) {
            // 监听连接成功事件
            connectedSemaphore.countDown();
        }
    }
}

class IStringCallback implements AsyncCallback.StringCallback {
    public void processResult(int rc, String path, Object ctx, String name) {
        // 异步回调处理创建节点结果
        System.out.println("Create path result: [" + rc + ", " + path + ", " + ctx + ", real path name: " + name);
    }
}

运行结果如下,0表示运行成功,-110表示NodeExists.

代码中创建节点回调接口为 AsyncCallback.StringCallback

AsyncCallback接口中包含所有操作znode节点结果的回调接口 

package org.apache.zookeeper;

import java.util.List;

import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;

public interface AsyncCallback {
    // 获取stat回调
    interface StatCallback extends AsyncCallback {
        public void processResult(int rc, String path, Object ctx, Stat stat);
    }
    // 修改数据回调
    interface DataCallback extends AsyncCallback {
        public void processResult(int rc, String path, Object ctx, byte data[],
                Stat stat);
    }
    // 权限控制回调
    interface ACLCallback extends AsyncCallback {
        public void processResult(int rc, String path, Object ctx,
                List<ACL> acl, Stat stat);
    }
    // 子节点回调
    interface ChildrenCallback extends AsyncCallback {
        public void processResult(int rc, String path, Object ctx,
                List<String> children);
    }
    // 子节点回调
    interface Children2Callback extends AsyncCallback {
        public void processResult(int rc, String path, Object ctx,
                List<String> children, Stat stat);
    }
    // 创建节点回调
    interface StringCallback extends AsyncCallback {
        public void processResult(int rc, String path, Object ctx, String name);
    }
    // 删除节点回调
    interface VoidCallback extends AsyncCallback {
        public void processResult(int rc, String path, Object ctx);
    }
}

** 2.获取数据** 

package com.szwn.zk;

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class AsyncGetData implements Watcher {
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
    private static ZooKeeper zk = null;

    public static void main(String[] args) throws Exception {
        String path = "/zk-book";
        // 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
        zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new AsyncGetData());
        // 2.等待连接完成
        connectedSemaphore.await();
        // 3.创建节点,节点类型为ephemeral
        zk.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("success create znode: " + path);

        // 4.获取数据,异步处理结果并且继续监听事件
        zk.getData(path, true, new IDataCallback(), null);

        // 5.修改数据
        zk.setData(path, "123".getBytes(), -1);

        Thread.sleep(Integer.MAX_VALUE);
    }

    public void process(WatchedEvent event) {
        if (Event.KeeperState.SyncConnected == event.getState()) {
            if (Event.EventType.None == event.getType() && null == event.getPath()) {
                // 监听连接成功事件
                connectedSemaphore.countDown();
            } else if (event.getType() == Event.EventType.NodeDataChanged) {
                try {
                    // 监听数据改变事件,并且异步处理结果,同时继续监听
                    zk.getData(event.getPath(), true, new IDataCallback(), null);
                } catch (Exception e) {
                }
            }
        }
    }
}

class IDataCallback implements AsyncCallback.DataCallback {
    @Override
    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
        // 异步回调处理获取数据结果
        System.out.println("rc: " + rc + ", path: " + path + ", data: " + new String(data));
        System.out.println("czxID: " + stat.getCzxid() + ", mzxID: " + stat.getMzxid() + ", version: " + stat.getVersion());
    }
}

3.设置数据

package com.szwn.zk;

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.CountDownLatch;

public class AsyncSetData implements Watcher{
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
    private static ZooKeeper zk;

    public static void main(String[] args) throws Exception {
        String path = "/zk-book";
        // 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
        zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new AsyncSetData());
        // 2.等待连接完成
        connectedSemaphore.await();

        // 3.创建节点,节点类型为ephemeral
        zk.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("success create znode: " + path);
        // 4.修改数据并异步处理回调
        zk.setData(path, "456".getBytes(), -1, new IStatCallback(), null);

        Thread.sleep(Integer.MAX_VALUE);
    }

    public void process(WatchedEvent event) {
        if (Event.KeeperState.SyncConnected == event.getState()) {
            // 监听连接成功事件
            if (Event.EventType.None == event.getType() && null == event.getPath()) {
                connectedSemaphore.countDown();
            }
        }
    }
}

class IStatCallback implements AsyncCallback.StatCallback {
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        // 异步回调处理获取数据结果
        System.out.println("rc: " + rc + ", path: " + path + ", stat: " + stat);
    }
}

4.获取子节点

package com.szwn.zk;

import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class AsyncGetChildren implements Watcher {
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
    private static ZooKeeper zk = null;

    public static void main(String[] args) throws Exception {
        String path = "/zk-book";
        // 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
        zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new AsyncGetChildren());
        // 2.等待连接完成
        connectedSemaphore.await();
        // 3.创建节点,节点类型为persistent
        zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("success create znode: " + path);
        // 4.创建节点/path/c1,节点类型为ephemeral
        zk.create(path + "/c1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("success create znode: " + path + "/c1");
        // 5.获取path子节点信息,设置监听为true,回调异步处理结果
        zk.getChildren(path, true, new IChildren2Callback(), null);
        // 6.创建节点/path/c2,节点类型为ephemeral
        zk.create(path + "/c2", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("success create znode: " + path + "/c2");

        Thread.sleep(Integer.MAX_VALUE);
    }

    public void process(WatchedEvent event) {
        if (Event.KeeperState.SyncConnected == event.getState()) {
            // 监听连接成功事件
            if (Event.EventType.None == event.getType() && null == event.getPath()) {
                connectedSemaphore.countDown();
                // 监听子节点变化事件
            } else if (event.getType() == Event.EventType.NodeChildrenChanged) {
                try {
                    // 获取path子节点信息,设置监听为true,回调异步处理结果
                    zk.getChildren(event.getPath(), true, new IChildren2Callback(), null);
                } catch (Exception e) {
                }
            }
        }
    }
}

class IChildren2Callback implements AsyncCallback.Children2Callback {
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        // 异步回调处理获取子节点信息结果
        System.out.println("Get Children znode result: [response code: " + rc + ", param path: " + path + ", ctx: " + ctx + ", children list: " + children
                        + ", stat: " + stat);
    }
}

5.权限设置获取

package com.szwn.zk;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;

public class AsyncGetSetAcl implements Watcher {
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
    private static ZooKeeper zk = null;

    public static void main(String[] args) throws Exception {
        String path = "/zk-async-acl";
        // 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
        zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new AsyncGetSetAcl());
        // 2.等待连接完成
        connectedSemaphore.await();
        // 3.创建节点,节点类型为ephemeral
        zk.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("success create znode: " + path);

        // 4.获取acl,异步处理结果并且继续监听事件
        zk.getACL(path,null,new IACLCallback(),"async process get acl");

        // 5.修改acl
        List<ACL> digest = Arrays.asList(new ACL(1, new Id("digest", "foo:true")));
        zk.setACL(path,digest,-1,new IStatCallbackAcl(),"async process set acl");

        // 6.获取acl,异步处理结果并且继续监听事件
        zk.getACL(path,null,new IACLCallback(),"async process get acl2");

        Thread.sleep(Integer.MAX_VALUE);
    }

    public void process(WatchedEvent event) {
        if (Event.KeeperState.SyncConnected == event.getState()) {
            if (Event.EventType.None == event.getType() && null == event.getPath()) {
                // 监听连接成功事件
                connectedSemaphore.countDown();
            }
        }
    }
}

class IACLCallback implements AsyncCallback.ACLCallback {
    @Override
    public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) {
        // 异步回调处理获取权限结果
        System.out.println("rc: " + rc + ", path: " + path + ", acl: " + acl);
        System.out.println("czxID: " + stat.getCzxid() + ", mzxID: " + stat.getMzxid() + ", version: " + stat.getVersion());
    }
}

class IStatCallbackAcl implements AsyncCallback.StatCallback {
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        // 异步回调处理获取数据结果
        System.out.println("rc: " + rc + ", path: " + path + ", stat: " + stat);
    }
}

6.节点存在

package com.szwn.zk;

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class AsyncExist implements Watcher {
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
    private static ZooKeeper zk;

    public static void main(String[] args) throws Exception {
        String path = "/zk-async-exist";
        // 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
        zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new AsyncExist());
        // 2.等待连接完成
        connectedSemaphore.await();
        // 3.请求节点是否存在,设置监听为true,并异步处理返回结果
        zk.exists(path, true, new IIStatCallback(), null);
        // 4.创建节点
        zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        // 5.设置数据
        zk.setData(path, "123".getBytes(), -1);
        // 6.创建子节点path/exist
        zk.create(path + "/exist", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("success create znode: " + path + "/c1");
        // 7.删除子节点path/exist
        zk.delete(path + "/exist", -1);
        zk.delete(path, -1);

        Thread.sleep(Integer.MAX_VALUE);
    }

    public void process(WatchedEvent event) {
        try {
            if (Event.KeeperState.SyncConnected == event.getState()) {
                // 监听连接成功事件
                if (Event.EventType.None == event.getType() && null == event.getPath()) {
                    connectedSemaphore.countDown();
                    // 监听节点创建事件,并重新加入监听,异步处理返回结果
                } else if (Event.EventType.NodeCreated == event.getType()) {
                    System.out.println("success create znode: " + event.getPath());
                    zk.exists(event.getPath(), true, new IIStatCallback(), null);
                } else if (Event.EventType.NodeDeleted == event.getType()) {
                    System.out.println("success delete znode: " + event.getPath());
                    zk.exists(event.getPath(), true, new IIStatCallback(), null);
                } else if (Event.EventType.NodeDataChanged == event.getType()) {
                    System.out.println("data changed of znode: " + event.getPath());
                    zk.exists(event.getPath(), true, new IIStatCallback(), null);
                }
            }
        } catch (Exception e) {
        }
    }
}

class IIStatCallback implements AsyncCallback.StatCallback {
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        // 异步回调处理结果
        System.out.println("rc: " + rc + ", path: " + path + ", stat: " + stat);
    }
}

rc: -101(node does not exist)

7.删除节点

package com.szwn.zk;

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

import java.util.concurrent.CountDownLatch;

public class AsyncDeleteNode implements Watcher{
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
    private static ZooKeeper zk;

    public static void main(String[] args) throws Exception {
        String path = "/zk-async-delete";
        // 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
        zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000,
                new AsyncDeleteNode());
        // 2.等待连接完成
        connectedSemaphore.await();
        // 3.创建节点
        zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("success create znode: " + path);
        // 4.创建子节点/path/c1
        zk.create(path + "/c1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("success create znode: " + path + "/c1");
        // 5.删除节点/path,直接删除,有子节点会报错
        zk.delete(path, -1, new IVoidCallback(), null);
        // 6.删除子节点/path/c1
        zk.delete(path + "/c1", -1, new IVoidCallback(), null);
        // 7.删除节点/path
        zk.delete(path, -1, new IVoidCallback(), null);

        Thread.sleep(Integer.MAX_VALUE);
    }

    public void process(WatchedEvent event) {
        if (Event.KeeperState.SyncConnected == event.getState()) {
            if (Event.EventType.None == event.getType() && null == event.getPath()) {
                connectedSemaphore.countDown();
            }
        }
    }
}

class IVoidCallback implements AsyncCallback.VoidCallback {
    public void processResult(int rc, String path, Object ctx) {
        System.out.println(rc + ", " + path + ", " + ctx);
    }
}

rc: -111(the node has children)

相关文章