Curator框架的使用

x33g5p2x  于2021-03-14 发布在 Zookeeper  
字(15.4k)|赞(0)|评价(0)|浏览(605)

一、Curator框架使用

Curator框架,非常强大,目前已经是Apache的顶级项目,里面提供了更多丰富的操作,session超时重连,主从选举,分布式计数器,分布式锁等适合各种复杂的zookeeper

二、依赖的引入

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.0.1</version>
</dependency>

三、Curator框架中使用链式编程的风格,易读性更强,试用工程方法创建连接对象

  1.使用CuratorFameworkFactory的两个静态工厂方法来实现
       参数一:connectString,连接串
       参数二: retryPolicy,重试连接策略,
       参数三 sessionTimeoutMs会话超时,默认60ms
       参数四 connectionTimeout 连接超时时间,默认为15000ms
  2.创建节点create方法,可选链式项
       creatingParentsIfNeeded:是否需要父节点
       withMode: 需要的模式
       forPath:     路径  key value
       withACL:    需要认证
   3.删除节点delete方法,可选择链式项
      deletingChildrenIfNeeded:递归的删除
      graranted:    安全的操作   
      withVersion:   删除版本
      forPath:          
    4.读取和修改数据
        gatData:  读取数据
        setData:   设置数据
    6.异步绑定回调方法,比如节点绑定一个回调函数,该回调函数可以输出服务器的状态码,以及服务的时间的类型,还可以加入一个线程池进行优化操作。
    7.读取子节点方法getChildren  
    8.判断节点是否存在方法checkExists   

四、代码实现

     public class CuratorBase {
    /** zookeeper地址 */
    static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181";
    /** session超时时间 */
    static final int SESSION_OUTTIME = 5000;//ms 
    public static void main(String[] args) throws Exception {
        //1 重试策略:初试时间为1s 重试10次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
        //2 通过工厂创建连接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                    .connectString(CONNECT_ADDR)
                    .sessionTimeoutMs(SESSION_OUTTIME)
                    .retryPolicy(retryPolicy)
//					.namespace("super")
                    .build();
        //3 开启连接
        cf.start();
//		System.out.println(States.CONNECTED);
//		System.out.println(cf.getState());
        // 新加、删除
        /**
        //4 建立节点 指定节点类型(不加withMode默认为持久类型节点)、路径、数据内容
        cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1","c1内容".getBytes());
        //5 删除节点
        cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super");
        */
        // 读取、修改
        /**
        //创建节点
//		cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1","c1内容".getBytes());
//		cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c2","c2内容".getBytes());
        //读取节点
//		String ret1 = new String(cf.getData().forPath("/super/c2"));
//		System.out.println(ret1);
        //修改节点
//		cf.setData().forPath("/super/c2", "修改c2内容".getBytes());
//		String ret2 = new String(cf.getData().forPath("/super/c2"));
//		System.out.println(ret2);	
        */
        // 绑定回调函数
        /**
        ExecutorService pool = Executors.newCachedThreadPool();
        cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
        .inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception {
                System.out.println("code:" + ce.getResultCode());
                System.out.println("type:" + ce.getType());
                System.out.println("线程为:" + Thread.currentThread().getName());
            }
        }, pool)
        .forPath("/super/c3","c3内容".getBytes());
        Thread.sleep(Integer.MAX_VALUE);
        */
        // 读取子节点getChildren方法 和 判断节点是否存在checkExists方法
        /**
        List<String> list = cf.getChildren().forPath("/super");
        for(String p : list){
            System.out.println(p);
        }

        Stat stat = cf.checkExists().forPath("/super/c3");
        System.out.println(stat);
        Thread.sleep(2000);
        cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super");
        */
        //cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super");
    }
}      

总结: curator 对节点的简单操作

一、新建

cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .forPath("/super/c1","c1内容".getBytes());

二、删除节点

cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super");

三、获取子节点

cf.getData().forPath("/super/c2")

四、修改子节点

cf.setData().forPath("/super/c2", "修改c2内容".getBytes());

重点

  ExecutorService pool = Executors.newCachedThreadPool();
        cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
        .inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception {
                System.out.println("code:" + ce.getResultCode());
                System.out.println("type:" + ce.getType());
                System.out.println("线程为:" + Thread.currentThread().getName());
            }
        }, pool)
         为什么采用线程池去承载,是因为在高并发的情况下,我们不能为每一个线程开辟一段空间,我们采用
 线程池去调度。当线程池里面的线程有空闲的时间,我们会调度线程池里面的线程去执行里面的代码。

五、 Curator 的监听 之一

  第一步导入jar包
    <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.1</version>
</dependency>
   使用NodeCache的方式去客服端实例中注册一个监听缓存,然后实现对应的监听方法即可,这里我们主要有两种监听方式。
   NodeCacheListener: 监听节点的新增、修改操作
   PathChildrenCacheListener:监听子节点的新增、修改、删除操作

加缓存,不是重复注册

   public class CuratorWatcher1 {	
/** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181";
/** session超时时间 */
static final int SESSION_OUTTIME = 5000;//ms 	
public static void main(String[] args) throws Exception {		
	//1 重试策略:初试时间为1s 重试10次
	RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
	//2 通过工厂创建连接
	CuratorFramework cf = CuratorFrameworkFactory.builder()
				.connectString(CONNECT_ADDR)
				.sessionTimeoutMs(SESSION_OUTTIME)
				.retryPolicy(retryPolicy)
				.build();
	
	//3 建立连接
	cf.start();		
	//4 建立一个cache缓存
	final NodeCache cache = new NodeCache(cf, "/super", false);
	cache.start(true);
	cache.getListenable().addListener(new NodeCacheListener() {
		/**
		 * <B>方法名称:</B>nodeChanged<BR>
		 * <B>概要说明:</B>触发事件为创建节点和更新节点,在删除节点的时候并不触发此操作。<BR>
		 * @see org.apache.curator.framework.recipes.cache.NodeCacheListener#nodeChanged()
		 */
		@Override
		public void nodeChanged() throws Exception {
			System.out.println("路径为:" + cache.getCurrentData().getPath());
			System.out.println("数据为:" + new String(cache.getCurrentData().getData()));
			System.out.println("状态为:" + cache.getCurrentData().getStat());
			System.out.println("---------------------------------------");
		}
	});
	Thread.sleep(1000);
	cf.create().forPath("/super", "123".getBytes());		
	Thread.sleep(1000);
	cf.setData().forPath("/super", "456".getBytes());	
	Thread.sleep(1000);
	cf.delete().forPath("/super");		
	Thread.sleep(Integer.MAX_VALUE);

	}
}

六、 Curator 的监听 之二

public class CuratorWatcher {

    /** zookeeper地址 */
    static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181";
    /** session超时时间 */
    static final int SESSION_OUTTIME = 5000;//ms 

    public static void main(String[] args) throws Exception {

        //1 重试策略:初试时间为1s 重试10次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
        //2 通过工厂创建连接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                    .connectString(CONNECT_ADDR)
                    .sessionTimeoutMs(SESSION_OUTTIME)
                    .retryPolicy(retryPolicy)
                    .build();

        //3 建立连接
        cf.start();

        //4 建立一个PathChildrenCache缓存,第三个参数为是否接受节点数据内容 如果为false则不接受
        PathChildrenCache cache = new PathChildrenCache(cf, "/super", true);
        //5 在初始化的时候就进行缓存监听
        cache.start(StartMode.POST_INITIALIZED_EVENT);
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            /**
             * <B>方法名称:</B>监听子节点变更<BR>
             * <B>概要说明:</B>新建、修改、删除<BR>
             * @see org.apache.curator.framework.recipes.cache.PathChildrenCacheListener#childEvent(org.apache.curator.framework.CuratorFramework, org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent)
             */
            @Override
            public void childEvent(CuratorFramework cf, PathChildrenCacheEvent event) throws Exception {
                switch (event.getType()) {
                case CHILD_ADDED:
                    System.out.println("CHILD_ADDED :" + event.getData().getPath());
                    break;
                case CHILD_UPDATED:
                    System.out.println("CHILD_UPDATED :" + event.getData().getPath());
                    break;
                case CHILD_REMOVED:
                    System.out.println("CHILD_REMOVED :" + event.getData().getPath());
                    break;
                default:
                    break;
                }
            }
        });

        //创建本身节点不发生变化
        cf.create().forPath("/super", "init".getBytes());

        //添加子节点
        Thread.sleep(1000);
        cf.create().forPath("/super/c1", "c1内容".getBytes());
        Thread.sleep(1000);
        cf.create().forPath("/super/c2", "c2内容".getBytes());

        //修改子节点
        Thread.sleep(1000);
        cf.setData().forPath("/super/c1", "c1更新内容".getBytes());

        //删除子节点
        Thread.sleep(1000);
        cf.delete().forPath("/super/c2");		

        //删除本身节点
        Thread.sleep(1000);
        cf.delete().deletingChildrenIfNeeded().forPath("/super");

        Thread.sleep(Integer.MAX_VALUE);
    }
}

七、分布式锁

在java高并发和多线程中是怎么解决的呢?

      在分布式场景中,我们为了保证数据的一致性,经常在程序运行的某一点需要同步操作
    (java 可提供synchronized 或者 Reentrantlock实现)
  
     public class Lock1 {

static ReentrantLock reentrantLock = new ReentrantLock();
static int count = 10;
public static void genarNo(){
	try {
		reentrantLock.lock();
		count--;
		//System.out.println(count);
	} finally {
		reentrantLock.unlock();
	}
}

public static void main(String[] args) throws Exception{
	
	final CountDownLatch countdown = new CountDownLatch(1);
	for(int i = 0; i < 10; i++){
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					countdown.await();
					genarNo();
					SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
					System.out.println(sdf.format(new Date()));
					//System.out.println(System.currentTimeMillis());
				} catch (Exception e) {
					e.printStackTrace();
				} finally {
				}
			}
		},"t" + i).start();
	}
	Thread.sleep(50);
	countdown.countDown();

	
	}
}

Zookpeer分布式锁的实现

  public class Lock2 {

        /** zookeeper地址 */
        static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181";
        /** session超时时间 */
        static final int SESSION_OUTTIME = 5000;//ms 

        static int count = 10;
        public static void genarNo(){
            try {
                count--;
                System.out.println(count);
            } finally {

            }
        }

        public static void main(String[] args) throws Exception {

            //1 重试策略:初试时间为1s 重试10次
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
            //2 通过工厂创建连接
            CuratorFramework cf = CuratorFrameworkFactory.builder()
                        .connectString(CONNECT_ADDR)
                        .sessionTimeoutMs(SESSION_OUTTIME)
                        .retryPolicy(retryPolicy)
    //					.namespace("super")
                        .build();
            //3 开启连接
            cf.start();

            //4 分布式锁
            final InterProcessMutex lock = new InterProcessMutex(cf, "/super");
            //final ReentrantLock reentrantLock = new ReentrantLock();
            final CountDownLatch countdown = new CountDownLatch(1);

            for(int i = 0; i < 10; i++){
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            countdown.await();
                            //加锁
                            lock.acquire();
                            //reentrantLock.lock();
                            //-------------业务处理开始
                            //genarNo();
                            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
                            System.out.println(sdf.format(new Date()));
                            //System.out.println(System.currentTimeMillis());
                            //-------------业务处理结束
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            try {
                                //释放
                                lock.release();
                                //reentrantLock.unlock();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }
                },"t" + i).start();
            }
            Thread.sleep(100);
            countdown.countDown();

        }
    }

八、分布式计数器

   分布式计数器,在高并发中使用AtomicInteger这种经典的方式、如果对于一个jvm
   的场景当然没有问题,但是我们现在是分布式、就需要利用Curator框架的Distributed
   AtomicInteger了
       public class CuratorAtomicInteger {
    /** zookeeper地址 */
    static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181";
    /** session超时时间 */
    static final int SESSION_OUTTIME = 5000;//ms 
    public static void main(String[] args) throws Exception {
        //1 重试策略:初试时间为1s 重试10次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
        //2 通过工厂创建连接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                    .connectString(CONNECT_ADDR)
                    .sessionTimeoutMs(SESSION_OUTTIME)
                    .retryPolicy(retryPolicy)
                    .build();
        //3 开启连接
        cf.start();
        //cf.delete().forPath("/super");
        //4 使用DistributedAtomicInteger
        DistributedAtomicInteger atomicIntger = 
                new DistributedAtomicInteger(cf, "/super", new RetryNTimes(3, 1000));
        AtomicValue<Integer> value = atomicIntger.add(1);
        System.out.println(value.succeeded());
        System.out.println(value.postValue());	//最新值
        System.out.println(value.preValue());	//原始值
    }
}

九、barrier

public class CuratorBarrier1 {

        /** zookeeper地址 */
        static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181";
        /** session超时时间 */
        static final int SESSION_OUTTIME = 5000;//ms 

        public static void main(String[] args) throws Exception {



            for(int i = 0; i < 5; i++){
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
                            CuratorFramework cf = CuratorFrameworkFactory.builder()
                                        .connectString(CONNECT_ADDR)
                                        .retryPolicy(retryPolicy)
                                        .build();
                            cf.start();

                            DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/super", 5);
                            Thread.sleep(1000 * (new Random()).nextInt(3)); 
                            System.out.println(Thread.currentThread().getName() + "已经准备");
                            barrier.enter();
                            System.out.println("同时开始运行...");
                            Thread.sleep(1000 * (new Random()).nextInt(3));
                            System.out.println(Thread.currentThread().getName() + "运行完毕");
                            barrier.leave();
                            System.out.println("同时退出运行...");


                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                },"t" + i).start();
            }



        }
    }






    public class CuratorBarrier2 {

    /** zookeeper地址 */
    static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181";
    /** session超时时间 */
    static final int SESSION_OUTTIME = 5000;//ms 

    static DistributedBarrier barrier = null;

    public static void main(String[] args) throws Exception {



        for(int i = 0; i < 5; i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
                        CuratorFramework cf = CuratorFrameworkFactory.builder()
                                    .connectString(CONNECT_ADDR)
                                    .sessionTimeoutMs(SESSION_OUTTIME)
                                    .retryPolicy(retryPolicy)
                                    .build();
                        cf.start();
                        barrier = new DistributedBarrier(cf, "/super");
                        System.out.println(Thread.currentThread().getName() + "设置barrier!");
                        barrier.setBarrier();	//设置
                        barrier.waitOnBarrier();	//等待
                        System.out.println("---------开始执行程序----------");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            },"t" + i).start();
        }

        Thread.sleep(5000);
        barrier.removeBarrier();	//释放


    }
}

十、集群管理

       public class CuratorWatcher {

        /** 父节点path */
        static final String PARENT_PATH = "/super";

        /** zookeeper服务器地址 */
        public static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181";	/** 定义session失效时间 */

        public static final int SESSION_TIMEOUT = 30000;

        public CuratorWatcher() throws Exception{
            //1 重试策略:初试时间为1s 重试10次
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
            //2 通过工厂创建连接
            CuratorFramework cf = CuratorFrameworkFactory.builder()
                    .connectString(CONNECT_ADDR)
                    .sessionTimeoutMs(SESSION_TIMEOUT)
                    .retryPolicy(retryPolicy)
                    .build();
            //3 建立连接
            cf.start();

            //4 创建跟节点
            if(cf.checkExists().forPath(PARENT_PATH) == null){
                cf.create().withMode(CreateMode.PERSISTENT).forPath(PARENT_PATH,"super init".getBytes());
            }

            //4 建立一个PathChildrenCache缓存,第三个参数为是否接受节点数据内容 如果为false则不接受
            PathChildrenCache cache = new PathChildrenCache(cf, PARENT_PATH, true);
            //5 在初始化的时候就进行缓存监听
            cache.start(StartMode.POST_INITIALIZED_EVENT);
            cache.getListenable().addListener(new PathChildrenCacheListener() {
                /**
                 * <B>方法名称:</B>监听子节点变更<BR>
                 * <B>概要说明:</B>新建、修改、删除<BR>
                 * @see org.apache.curator.framework.recipes.cache.PathChildrenCacheListener#childEvent(org.apache.curator.framework.CuratorFramework, org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent)
                 */
                @Override
                public void childEvent(CuratorFramework cf, PathChildrenCacheEvent event) throws Exception {
                    switch (event.getType()) {
                    case CHILD_ADDED:
                        System.out.println("CHILD_ADDED :" + event.getData().getPath());
                        System.out.println("CHILD_ADDED :" + new String(event.getData().getData()));
                        break;
                    case CHILD_UPDATED:
                        System.out.println("CHILD_UPDATED :" + event.getData().getPath());
                        System.out.println("CHILD_UPDATED :" + new String(event.getData().getData()));
                        break;
                    case CHILD_REMOVED:
                        System.out.println("CHILD_REMOVED :" + event.getData().getPath());
                        System.out.println("CHILD_REMOVED :" + new String(event.getData().getData()));
                        break;
                    default:
                        break;
                    }
                }
            });
        }

    }
    
    
            public class Client1 {

                    public static void main(String[] args) throws Exception{

                        CuratorWatcher watcher = new CuratorWatcher();
                        Thread.sleep(100000000);
                    }
       }
       
       
        public class Client2 {

            public static void main(String[] args) throws Exception{

                CuratorWatcher watcher = new CuratorWatcher();
                Thread.sleep(100000000);
            }
        }

相关文章