你的位置:首页 > ASP.net教程

[ASP.net教程](原) 2.3 Curator使用


本文为原创文章,转载请注明出处,谢谢

Curator使用

1、jar包引入,演示版本为2.6.0,非maven项目,可以下载jar包导入到项目中

 

    <dependency>      <groupId>org.apache.curator</groupId>      <artifactId>curator-framework</artifactId>      <version>2.6.0</version>    </dependency>    <dependency>      <groupId>org.apache.curator</groupId>      <artifactId>curator-recipes</artifactId>      <version>2.6.0</version>    </dependency> 

 

2、RetryPolicy:重试机制

  • ExponentialBackoffRetry:每次重试会增加重试时间baseSleepTimeMs
    • ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)
    • ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
      • baseSleepTimeMs:基本重试时间差
      • maxRetries:最大重试次数
      • maxSleepMs:最大重试时间
  • RetryNTimes
    • RetryNTimes(int n, int sleepMsBetweenRetries)
      • n:重试次数
      • sleepMsBetweenRetries:每次重试间隔时间
  • RetryUntilElapsed
    • RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)
      • maxElapsedTimeMs:最大重试时间
      • sleepMsBetweenRetries:每次重试间隔时间
  • BoundedExponentialBackoffRetry、RetryOneTime、SleepingRetry

3、创建Zookeeper连接

  • 传统方式

   示例:CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.117.128:2181",5000,5000,retryPolicy);

   API:  

newClient(java.lang.String connectString, org.apache.curator.RetryPolicy retryPolicy)newClient(java.lang.String connectString, int sessionTimeoutMs, int connectionTimeoutMs, org.apache.curator.RetryPolicy retryPolicy)

    • connectString:Zookeeper服务器地址
    • retryPolicy:自定义重试机制
    • sessionTimeoutMs:session超时时间
    • connectionTimeoutMs:连接超时时间
  • 链式方式 
curatorFramework = CuratorFrameworkFactory.builder()                .connectString("192.168.117.128:2181")                //.authorization() 设置访问权限 设置方法同原生API                .sessionTimeoutMs(5000).connectionTimeoutMs(5000)                .retryPolicy(retryPolicy).build();

  • 代码示例
  public void createSession() {    //RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);//基本重试间隔时间,重试次数(每次重试时间加长)    //RetryPolicy retryPolicy = new RetryNTimes(5,1000);//重试次数,重试间隔时间    RetryPolicy retryPolicy = new RetryUntilElapsed(5000,1000);//重试时间,重试间隔时间    //curatorFramework = CuratorFrameworkFactory.newClient("192.168.117.128:2181",5000,5000,retryPolicy);    curatorFramework = CuratorFrameworkFactory.builder()                .connectString("192.168.117.128:2181")                //.authorization() 设置访问权限 设置方法同原生API                .sessionTimeoutMs(5000).connectionTimeoutMs(5000)                .retryPolicy(retryPolicy).build();    curatorFramework.start();  }

 

4、创建节点

public void createNode() throws Exception {    createSession();    String path = curatorFramework.create()        .creatingParentsIfNeeded()//如果父节点没有自动创建        //.withACL()设置权限 权限创建同原生API        .withMode(CreateMode.PERSISTENT)//节点类型        .forPath("/note_curator/02", "02".getBytes());    System.out.println("path:"+path);  }

节点类型、权限设置详见2.1Zookeeper原生API使用

 

5、节点删除

 public void del() throws Exception {    createSession();    curatorFramework.delete()        .guaranteed()//保证机制,出错后后台删除 直到删除成功        .deletingChildrenIfNeeded()//删除当前节点下的所有节点,再删除自身        .forPath("/note_curator");  }

 

6、获取子节点

public void getChildren() throws Exception {    createSession();    List<String> children = curatorFramework.getChildren().forPath("/note_curator");    System.out.println(children);  }

 

7、获取节点信息

public void getData() throws Exception {    createSession();    Stat stat = new Stat();    byte[] u = curatorFramework.getData().storingStatIn(stat).forPath("/note_curator");    System.out.println(new String(u));    System.out.println(stat);  }

 

8、设置节点信息

public void setData() throws Exception {    createSession();    curatorFramework.setData()        //.withVersion(1) 设置版本号 乐观锁概念        .forPath("/note_curator/01", "shengke0815".getBytes());  }

 

9、是否存在节点

public void exists() throws Exception {    createSession();    Stat s = curatorFramework.checkExists().forPath("/note_curator");    System.out.println(s);  }

 

10、设置节点信息回调

 ExecutorService executorService = Executors.newFixedThreadPool(5);//线程池  @Test  public void setDataAsync() throws Exception {    createSession();    curatorFramework.setData().inBackground(new BackgroundCallback() {//设置节点信息时回调方法      @Override      public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {        System.out.println(curatorFramework.getZookeeperClient());        System.out.println(curatorEvent.getResultCode());        System.out.println(curatorEvent.getPath());        System.out.println(curatorEvent.getContext());      }    },"shangxiawen",executorService).forPath("/note_curator","sksujer0815".getBytes());    Thread.sleep(Integer.MAX_VALUE);  }

API:

inBackground(org.apache.curator.framework.api.BackgroundCallback backgroundCallback, java.lang.Object o, java.util.concurrent.Executor executor);

    • backgroundCallback:自定义BackgroundCallback
    •   o:上下文信息,回调方法中curatorEvent.getContext()可获取此信息
    •   executor:线程池

 

11、监听节点改变事件

public void nodeListen() throws Exception {    createSession();    final NodeCache cache = new NodeCache(curatorFramework,"/note_curator");    cache.start();    cache.getListenable().addListener(new NodeCacheListener() {      @Override      public void nodeChanged() throws Exception {        System.out.println(new String(cache.getCurrentData().getData()));        System.out.println(cache.getCurrentData().getPath());      }    });    Thread.sleep(Integer.MAX_VALUE);  }

 

12、监听子节点列表改变事件

public void nodeClildrenListen() throws Exception {    createSession();    final PathChildrenCache cache = new PathChildrenCache(curatorFramework,"/note_curator",true);    cache.start();    cache.getListenable().addListener(new PathChildrenCacheListener() {      @Override      public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {        switch (pathChildrenCacheEvent.getType()){          case CHILD_ADDED:            System.out.println("add children");            System.out.println(new String(pathChildrenCacheEvent.getData().getData()));            System.out.println(new String(pathChildrenCacheEvent.getData().getPath()));            break;          case CHILD_REMOVED:            System.out.println("remove children");            System.out.println(new String(pathChildrenCacheEvent.getData().getData()));            System.out.println(new String(pathChildrenCacheEvent.getData().getPath()));            break;          case CHILD_UPDATED:            System.out.println("update children");            System.out.println(new String(pathChildrenCacheEvent.getData().getData()));            System.out.println(new String(pathChildrenCacheEvent.getData().getPath()));            break;        }      }    });    Thread.sleep(Integer.MAX_VALUE);  }