zookeeper java api 操作(四) | curator

x33g5p2x  于2021-12-20 转载在 其他  
字(12.9k)|赞(0)|评价(0)|浏览(413)

一、概述

上一篇文章学习了用zkClient客户端对zookeeper进行操作,下面我们将学习由Netflix公司的Jordan Zimmerman一套开源的

Zookeeper客户端框架Curator。Curator与zkClient一样解决了底层的细节开发工作,包括session过期重连、反复注册、异常报错

的封装等。除此之外,Curator目前是Apache基金会的顶级项目之一,Curator具有更加完善的文档,另外还提供了一套易用性和

可读性更强的Fluent风格的客户端API框架,提供了更加多的应用场景(Recipe,共享锁服务、Master选举和分布式计算的封装

等),方便用户使用和开发。目前很多项目均使用Curator作为zookeeper客户端进行开发,如阿里的开源分布式事务管理框架seata。

二、pom 依赖

<dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-framework</artifactId>
      <version>4.0.1</version>
</dependency>

三、Curator

1.创建节点

package com.szwn.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;

public class CreateNode {
    public static void main(String[] args) throws Exception {
        String path1 = "/curator-create/c1";
        // 创建CuratorFramework 客户端实例,集群服务器地址为127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,session timeout5000ms
        // 回调策略为ExponentialBackoffRetry,即为retries 3 times with increasing 1000 sleep time between retries
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183").sessionTimeoutMs(5000)
                        .retryPolicy(new ExponentialBackoffRetry(1000, 3)).aclProvider(new DefaultACLProvider()).build();
        // 开始连接
        client.start();
        // 创建 EPHEMERAL类型节点,并创建父节点
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path1, "init".getBytes());
        System.out.println("success create znode: " + path1);
        String path2 = "/curator-create/c2";
        // 创建 EPHEMERAL类型节点,并创建父节点,设置acl权限控制
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path2, "init2".getBytes());
        System.out.println("success create znode: " + path2);
    }
}

2.创建节点(BackGround模式)

package com.szwn.curator;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

public class CreateNodeBackground {
	static String path = "/curator-create-backGround/c1";
	// 创建CuratorFramework 客户端实例,集群服务器地址为127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,session timeout5000ms
	// 回调策略为ExponentialBackoffRetry,即为retries 3 times with increasing 1000 sleep time between retries
	static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
			.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
	// 2个信号量,BackGround 回调callBack成功再停止
	static CountDownLatch semaphore = new CountDownLatch(2);
	// 固定线程池,2个线程
	static ExecutorService es = Executors.newFixedThreadPool(2);

	public static void main(String[] args) throws Exception {
		// 开始连接
		client.start();
		System.out.println("Main thread: " + Thread.currentThread().getName());

		// 创建 EPHEMERAL类型节点,并创建父节点,使用一部线程background方式处理创建结果
		client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
			public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
				// 打印事件信息
				System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName());
				System.out.println();
				semaphore.countDown();
			}
		}, es).forPath(path, "init".getBytes());

		// 创建相同路径节点
		client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
			public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
				// 打印事件信息
				System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName());
				semaphore.countDown();
			}
		}).forPath(path, "init".getBytes());

		semaphore.await();
		// 关闭线程池
		es.shutdown();
		// 关闭连接
		client.close();
	}
}

一个成功,code为0,一个失败,code-110(节点已经存在)

3.获取数据

package com.szwn.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

public class GetData {
	public static void main(String[] args) throws Exception {
		String path = "/curator-create-getData/c1";
		// 创建CuratorFramework 客户端实例,集群服务器地址为127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,session timeout5000ms
		// 回调策略为ExponentialBackoffRetry,即为retries 3 times with increasing 1000 sleep time between retries
		CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
				.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
		// 开始连接
		client.start();
		// 创建 EPHEMERAL类型节点,并创建父节点
		client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
		Stat stat = new Stat();
		// 获取节点数据,存储stat在new Stat()中,路径为/curator-create-getData/c1
		System.out.println(new String(client.getData().storingStatIn(stat).forPath(path)));
	}
}

4.设置数据

package com.szwn.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

public class SetData {
	public static void main(String[] args) throws Exception {
		String path = "/curator-create-setData/c1";
		// 创建CuratorFramework 客户端实例,集群服务器地址为127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,session timeout5000ms
		// 回调策略为ExponentialBackoffRetry,即为retries 3 times with increasing 1000 sleep time between retries
		CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
				.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
		// 开始连接
		client.start();
		// 创建 EPHEMERAL类型节点,并创建父节点
		client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
		Stat stat = new Stat();
		// 获取节点数据,存储stat在new Stat()中,路径为/curator-create-setData/c1
		client.getData().storingStatIn(stat).forPath(path);
		// 设置数据,版本号为获取数据的版本号,路径为/curator-create-setData/c1
		Stat stat1 = client.setData().withVersion(stat.getVersion()).forPath(path);
		System.out.println("Success set node for : " + path + ", new version: "
				+ stat1.getVersion());
		try {
			// 重新设置数据,版本号为获取数据的版本号,路径为/curator-create-setData/c1
			client.setData().withVersion(stat.getVersion()).forPath(path);
		} catch (Exception e) {
			// 版本不一致报错
			System.out.println("Fail set node due to " + e.getMessage());
		}
	}
}

5.获取子节点

package com.szwn.curator;

import java.util.List;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

public class Children {
    public static void main(String[] args) throws Exception {
        String path = "/curator-children";
        String path1 = path + "/c1";
        // 创建CuratorFramework 客户端实例,集群服务器地址为127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,session timeout5000ms
        // 回调策略为ExponentialBackoffRetry,即为retries 3 times with increasing 1000 sleep time between retries
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183").sessionTimeoutMs(5000)
                        .retryPolicy(new ExponentialBackoffRetry(1000, 3)).aclProvider(new DefaultACLProvider()).build();
        // 开始连接
        client.start();
        // 创建 EPHEMERAL类型节点,并创建父节点
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path1, "init".getBytes());
        System.out.println("success create znode: " + path1);
        String path2 = path + "/c2";
        // 创建 EPHEMERAL类型节点,并创建父节点,设置acl权限控制
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path2, "init2".getBytes());
        System.out.println("success create znode: " + path2);
        // 获取/curator-children子节点
        List<String> list = client.getChildren().forPath(path);
        System.out.println(list);
    }
}

6.EnsurePath工具类

package com.szwn.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;
import org.apache.zookeeper.CreateMode;

public class EnsureZkPath {
	static String path = "/curator_ensure_path";
	// 创建CuratorFramework 客户端实例,集群服务器地址为127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,session timeout5000ms
	// 回调策略为ExponentialBackoffRetry,即为retries 3 times with increasing 1000 sleep time between retries
	static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
			.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

	public static void main(String[] args) throws Exception {
		client.start();

		org.apache.curator.utils.EnsurePath ensurePath = new org.apache.curator.utils.EnsurePath(path);
		// first time syncs and creates if needed
		ensurePath.ensure(client.getZookeeperClient());
		// 直接创建子节点path1,因为前面ensure没有的话,自动创建了path节点
		String path1 = path + "/c1";
		client.create().withMode(CreateMode.EPHEMERAL).forPath(path1, "init".getBytes());
		System.out.println("success create znode: " + path1);
		// subsequent times are NOPs,path已经存在,不做任何操作
		ensurePath.ensure(client.getZookeeperClient());
		// 直接创建子节点path2
		String path2 = path + "/c2";
		client.create().withMode(CreateMode.EPHEMERAL).forPath(path2, "init2".getBytes());
		System.out.println("success create znode: " + path1);
	}
}

7.ZKPaths工具类

package com.szwn.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.apache.curator.utils.ZKPaths.PathAndNode;
import org.apache.zookeeper.ZooKeeper;

public class ZKPath {
	static String path = "/curator_zkPath";
	// 创建CuratorFramework 客户端实例,集群服务器地址为127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,session timeout5000ms
	// 回调策略为ExponentialBackoffRetry,即为retries 3 times with increasing 1000 sleep time between retries
	static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
			.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

	public static void main(String[] args) throws Exception {
		// 开始连接
		client.start();
		// 获取原生zookeeper
		ZooKeeper zookeeper = client.getZookeeperClient().getZooKeeper();
		// 修改namespace为/curator_zkPath,节点路径为sub
		System.out.println(org.apache.curator.utils.ZKPaths.fixForNamespace(path, "sub"));
		// 在父节点path下创建子节点sub
		System.out.println(org.apache.curator.utils.ZKPaths.makePath(path, "sub"));
		// 获取节点路径
		System.out.println(org.apache.curator.utils.ZKPaths.getNodeFromPath("/curator_zkPath/sub1"));
		// 获取path 和 node
		PathAndNode pn = org.apache.curator.utils.ZKPaths.getPathAndNode("/curator_zkPath/sub1");
		System.out.println(pn.getPath());
		System.out.println(pn.getNode());
		String dir1 = path + "/child1";
		String dir2 = path + "/child2";
		// 创建节点,没有数据
		org.apache.curator.utils.ZKPaths.mkdirs(zookeeper, dir1);
		org.apache.curator.utils.ZKPaths.mkdirs(zookeeper, dir2);
		// 获取子节点
		System.out.println(org.apache.curator.utils.ZKPaths.getSortedChildren(zookeeper, path));
		// 获取子节点,同时删除本身
		org.apache.curator.utils.ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
	}
}

8.删除节点

package com.szwn.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

public class DeleteData {
    public static void main(String[] args) throws Exception {
        String path = "/zk-curator";
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183").sessionTimeoutMs(5000)
                        .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        client.start();
        String path1 = path + "/c1";
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path1, "init".getBytes());
        String path2 = path + "/c2";
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path2, "init".getBytes());
        Stat stat = new Stat();
        // 查看path 下的子节点
        System.out.println(client.getChildren().forPath(path));
        // delete node and delete all children
        client.delete().deletingChildrenIfNeeded().withVersion(stat.getVersion()).forPath(path);
        System.out.println("success delete znode " + path);
        client.close();
        Thread.sleep(Integer.MAX_VALUE);
    }
}

相关文章