文章13 | 阅读 5142 | 点赞0
上一篇文章提到的可以通过java api操作zookeeper数据节点,并对节点状态进行监听
https://blog.csdn.net/qq_33513250/article/details/102289689
下面介绍操作znode并异步处理结果:
创建znode、获取数据、设置数据、获取子节点、权限设置、节点存在判断、删除节点
package com.szwn.zk;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;
public class AsyncCreateNote implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
// 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new AsyncCreateNote());
System.out.println(zookeeper.getState());
// 2.等待连接完成
connectedSemaphore.await();
// 3.异步创建节点,节点类型为ephemeral,并设置ctx值
zookeeper.create("/zk-test-ephemeral-", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
new IStringCallback(), "I am context. ");
// 4.异步创建节点,节点类型为ephemeral,并设置ctx值,相同路径报错
zookeeper.create("/zk-test-ephemeral-", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
new IStringCallback(), "I am context. ");
// 5.异步创建节点,节点类型为ephemeral_sequential,并设置ctx值
zookeeper.create("/zk-test-ephemeral-", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
new IStringCallback(), "I am context. ");
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
// 监听连接成功事件
connectedSemaphore.countDown();
}
}
}
class IStringCallback implements AsyncCallback.StringCallback {
public void processResult(int rc, String path, Object ctx, String name) {
// 异步回调处理创建节点结果
System.out.println("Create path result: [" + rc + ", " + path + ", " + ctx + ", real path name: " + name);
}
}
运行结果如下,0表示运行成功,-110表示NodeExists.
代码中创建节点回调接口为 AsyncCallback.StringCallback
AsyncCallback接口中包含所有操作znode节点结果的回调接口
package org.apache.zookeeper;
import java.util.List;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
public interface AsyncCallback {
// 获取stat回调
interface StatCallback extends AsyncCallback {
public void processResult(int rc, String path, Object ctx, Stat stat);
}
// 修改数据回调
interface DataCallback extends AsyncCallback {
public void processResult(int rc, String path, Object ctx, byte data[],
Stat stat);
}
// 权限控制回调
interface ACLCallback extends AsyncCallback {
public void processResult(int rc, String path, Object ctx,
List<ACL> acl, Stat stat);
}
// 子节点回调
interface ChildrenCallback extends AsyncCallback {
public void processResult(int rc, String path, Object ctx,
List<String> children);
}
// 子节点回调
interface Children2Callback extends AsyncCallback {
public void processResult(int rc, String path, Object ctx,
List<String> children, Stat stat);
}
// 创建节点回调
interface StringCallback extends AsyncCallback {
public void processResult(int rc, String path, Object ctx, String name);
}
// 删除节点回调
interface VoidCallback extends AsyncCallback {
public void processResult(int rc, String path, Object ctx);
}
}
package com.szwn.zk;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class AsyncGetData implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk = null;
public static void main(String[] args) throws Exception {
String path = "/zk-book";
// 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new AsyncGetData());
// 2.等待连接完成
connectedSemaphore.await();
// 3.创建节点,节点类型为ephemeral
zk.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + path);
// 4.获取数据,异步处理结果并且继续监听事件
zk.getData(path, true, new IDataCallback(), null);
// 5.修改数据
zk.setData(path, "123".getBytes(), -1);
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
if (Event.EventType.None == event.getType() && null == event.getPath()) {
// 监听连接成功事件
connectedSemaphore.countDown();
} else if (event.getType() == Event.EventType.NodeDataChanged) {
try {
// 监听数据改变事件,并且异步处理结果,同时继续监听
zk.getData(event.getPath(), true, new IDataCallback(), null);
} catch (Exception e) {
}
}
}
}
}
class IDataCallback implements AsyncCallback.DataCallback {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
// 异步回调处理获取数据结果
System.out.println("rc: " + rc + ", path: " + path + ", data: " + new String(data));
System.out.println("czxID: " + stat.getCzxid() + ", mzxID: " + stat.getMzxid() + ", version: " + stat.getVersion());
}
}
package com.szwn.zk;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;
public class AsyncSetData implements Watcher{
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk;
public static void main(String[] args) throws Exception {
String path = "/zk-book";
// 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new AsyncSetData());
// 2.等待连接完成
connectedSemaphore.await();
// 3.创建节点,节点类型为ephemeral
zk.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + path);
// 4.修改数据并异步处理回调
zk.setData(path, "456".getBytes(), -1, new IStatCallback(), null);
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
// 监听连接成功事件
if (Event.EventType.None == event.getType() && null == event.getPath()) {
connectedSemaphore.countDown();
}
}
}
}
class IStatCallback implements AsyncCallback.StatCallback {
public void processResult(int rc, String path, Object ctx, Stat stat) {
// 异步回调处理获取数据结果
System.out.println("rc: " + rc + ", path: " + path + ", stat: " + stat);
}
}
package com.szwn.zk;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class AsyncGetChildren implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk = null;
public static void main(String[] args) throws Exception {
String path = "/zk-book";
// 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new AsyncGetChildren());
// 2.等待连接完成
connectedSemaphore.await();
// 3.创建节点,节点类型为persistent
zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("success create znode: " + path);
// 4.创建节点/path/c1,节点类型为ephemeral
zk.create(path + "/c1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + path + "/c1");
// 5.获取path子节点信息,设置监听为true,回调异步处理结果
zk.getChildren(path, true, new IChildren2Callback(), null);
// 6.创建节点/path/c2,节点类型为ephemeral
zk.create(path + "/c2", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + path + "/c2");
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
// 监听连接成功事件
if (Event.EventType.None == event.getType() && null == event.getPath()) {
connectedSemaphore.countDown();
// 监听子节点变化事件
} else if (event.getType() == Event.EventType.NodeChildrenChanged) {
try {
// 获取path子节点信息,设置监听为true,回调异步处理结果
zk.getChildren(event.getPath(), true, new IChildren2Callback(), null);
} catch (Exception e) {
}
}
}
}
}
class IChildren2Callback implements AsyncCallback.Children2Callback {
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
// 异步回调处理获取子节点信息结果
System.out.println("Get Children znode result: [response code: " + rc + ", param path: " + path + ", ctx: " + ctx + ", children list: " + children
+ ", stat: " + stat);
}
}
package com.szwn.zk;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
public class AsyncGetSetAcl implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk = null;
public static void main(String[] args) throws Exception {
String path = "/zk-async-acl";
// 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new AsyncGetSetAcl());
// 2.等待连接完成
connectedSemaphore.await();
// 3.创建节点,节点类型为ephemeral
zk.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + path);
// 4.获取acl,异步处理结果并且继续监听事件
zk.getACL(path,null,new IACLCallback(),"async process get acl");
// 5.修改acl
List<ACL> digest = Arrays.asList(new ACL(1, new Id("digest", "foo:true")));
zk.setACL(path,digest,-1,new IStatCallbackAcl(),"async process set acl");
// 6.获取acl,异步处理结果并且继续监听事件
zk.getACL(path,null,new IACLCallback(),"async process get acl2");
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
if (Event.EventType.None == event.getType() && null == event.getPath()) {
// 监听连接成功事件
connectedSemaphore.countDown();
}
}
}
}
class IACLCallback implements AsyncCallback.ACLCallback {
@Override
public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) {
// 异步回调处理获取权限结果
System.out.println("rc: " + rc + ", path: " + path + ", acl: " + acl);
System.out.println("czxID: " + stat.getCzxid() + ", mzxID: " + stat.getMzxid() + ", version: " + stat.getVersion());
}
}
class IStatCallbackAcl implements AsyncCallback.StatCallback {
public void processResult(int rc, String path, Object ctx, Stat stat) {
// 异步回调处理获取数据结果
System.out.println("rc: " + rc + ", path: " + path + ", stat: " + stat);
}
}
package com.szwn.zk;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class AsyncExist implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk;
public static void main(String[] args) throws Exception {
String path = "/zk-async-exist";
// 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new AsyncExist());
// 2.等待连接完成
connectedSemaphore.await();
// 3.请求节点是否存在,设置监听为true,并异步处理返回结果
zk.exists(path, true, new IIStatCallback(), null);
// 4.创建节点
zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// 5.设置数据
zk.setData(path, "123".getBytes(), -1);
// 6.创建子节点path/exist
zk.create(path + "/exist", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("success create znode: " + path + "/c1");
// 7.删除子节点path/exist
zk.delete(path + "/exist", -1);
zk.delete(path, -1);
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
try {
if (Event.KeeperState.SyncConnected == event.getState()) {
// 监听连接成功事件
if (Event.EventType.None == event.getType() && null == event.getPath()) {
connectedSemaphore.countDown();
// 监听节点创建事件,并重新加入监听,异步处理返回结果
} else if (Event.EventType.NodeCreated == event.getType()) {
System.out.println("success create znode: " + event.getPath());
zk.exists(event.getPath(), true, new IIStatCallback(), null);
} else if (Event.EventType.NodeDeleted == event.getType()) {
System.out.println("success delete znode: " + event.getPath());
zk.exists(event.getPath(), true, new IIStatCallback(), null);
} else if (Event.EventType.NodeDataChanged == event.getType()) {
System.out.println("data changed of znode: " + event.getPath());
zk.exists(event.getPath(), true, new IIStatCallback(), null);
}
}
} catch (Exception e) {
}
}
}
class IIStatCallback implements AsyncCallback.StatCallback {
public void processResult(int rc, String path, Object ctx, Stat stat) {
// 异步回调处理结果
System.out.println("rc: " + rc + ", path: " + path + ", stat: " + stat);
}
}
rc: -101(node does not exist)
package com.szwn.zk;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;
public class AsyncDeleteNode implements Watcher{
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk;
public static void main(String[] args) throws Exception {
String path = "/zk-async-delete";
// 1.连接zookeeper 客户端,参数分别为服务器连接地址,集群用逗号隔开,sessionTimeout,事件监听器
zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000,
new AsyncDeleteNode());
// 2.等待连接完成
connectedSemaphore.await();
// 3.创建节点
zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("success create znode: " + path);
// 4.创建子节点/path/c1
zk.create(path + "/c1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("success create znode: " + path + "/c1");
// 5.删除节点/path,直接删除,有子节点会报错
zk.delete(path, -1, new IVoidCallback(), null);
// 6.删除子节点/path/c1
zk.delete(path + "/c1", -1, new IVoidCallback(), null);
// 7.删除节点/path
zk.delete(path, -1, new IVoidCallback(), null);
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
if (Event.EventType.None == event.getType() && null == event.getPath()) {
connectedSemaphore.countDown();
}
}
}
}
class IVoidCallback implements AsyncCallback.VoidCallback {
public void processResult(int rc, String path, Object ctx) {
System.out.println(rc + ", " + path + ", " + ctx);
}
}
rc: -111(the node has children)
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_33513250/article/details/102309556
内容来源于网络,如有侵权,请联系作者删除!