你的位置:首页 > ASP.net教程

[ASP.net教程](原)3.2 Zookeeper应用


本文为原创文章,转载请注明出处,谢谢

数据的发布与订阅

1、应用

  服务端监听数据改变,客户端创建/更新节点数据,客户端提供数据,服务端处理

2、原理 

  • 客户端监控节点数据改变事件(例如配置信息,下图的config节点),启动时在服务器节点下创建临时节点(图中servers下节点)
  • 服务端监听工作服务器的子节点更新,触发自身存储的工作服务器列表,同时监听订阅节点的数据改变事件(下图中command节点)

3、架构图

 

  • config:配置信息节点
  • servers:服务器列表父节点
  • command:数据订阅节点

 

4、客户端流程图

5、服务端流程图

 

 6、核心类关系图

  • SubscribeClient:模拟服务端、客户端启动
  • WorkServer:客户端
  • ManageServer:服务端
  • ServerConfig:配置信息
  • ServerData:server数据

7、核心代码

  • WorkServer 监听
    public WorkServer(String serverPath, String configPath, final ServerData serverData, ServerConfig serverConfig, ZkClient zkClient) {    this.serverPath = serverPath;    this.configPath = configPath;    this.serverData = serverData;    this.serverConfig = serverConfig;    this.zkClient = zkClient;    dataListener = new IZkDataListener() {      @Override      public void handleDataChange(String s, Object o) {        try{          String data = new String((byte[])o);          System.out.println(data);          ServerConfig config = (ServerConfig)JSON.parseObject(data,ServerConfig.class);          updateConfig(config);          System.out.println("server name:"+serverData.getName()+" update config:"+config.toString());        }catch (Exception e) {          e.printStackTrace();        }      }      @Override      public void handleDataDeleted(String s) throws Exception {      }    };  }

  • WorkServer 注册
    private void registerMe() {    String myPath = serverPath.concat("/").concat(serverData.getAddress());    try{      if(!zkClient.exists(myPath))        zkClient.createEphemeral(myPath,JSON.toJSONString(serverData).getBytes());    }catch (ZkNoNodeException e ) {      zkClient.createPersistent(serverPath, true);      registerMe();    }  }

    ps:此操作是在servers节点下创建节点,需要servers节点已存在

  • ManageServer 监听
     public ManageServer(String serverPath, String configPath, String cmdPath, ServerConfig serverConfig, ZkClient zkClient) {    this.serverPath = serverPath;    this.configPath = configPath;    this.cmdPath = cmdPath;    this.serverConfig = serverConfig;    this.zkClient = zkClient;    childListener = new IZkChildListener() {      @Override      public void handleChildChange(String s, List<String> strings) throws Exception {        workServerList = strings;        System.out.println("----"+workServerList.toString());      }    };    dataListener = new IZkDataListener() {      @Override      public void handleDataChange(String s, Object o) throws Exception {        String cmd = new String((byte[])o);        System.out.println("cmd="+cmd);        exeCmd(cmd);      }      @Override      public void handleDataDeleted(String s) throws Exception {      }    };  }

    • childListener :监听Servers下的节点变化
    • dataListener :监听command节点的数据变化
  • ManageServer 执行操作
    /**   *模拟命令:1、list 2、create 3、modify   */  private void exeCmd(String cmd) {    if("list".equals(cmd)) {      System.out.println(workServerList.toString());    }else if("create".equals(cmd)) {      exeCreate();    }else if("modify".equals(cmd)) {      exeModify();    }else {      System.out.println("this cmd can not exe");    }  }

  • SubscribeClient
    public class SubscribeClient {  private static final int CLIENT_QTY =3;  private static final String ZOOKEEPER_URL = "192.168.117.128:2181";  private static final String SERVERPATH = "/servers";  private static final String CONFIGPATH = "/config";  private static final String CMDPATH = "/command";  @Test  public void testSubScribe() throws IOException {    ServerConfig config = new ServerConfig("DBURL...","DBUSER...","DBPAW...");    ZkClient zk = new ZkClient(ZOOKEEPER_URL,5000,5000,new BytesPushThroughSerializer());    ManageServer manageServer = new ManageServer(SERVERPATH,CONFIGPATH,CMDPATH,config,zk);    manageServer.start();    for (int i = 0; i < CLIENT_QTY; i++) {      ZkClient zkq = new ZkClient(ZOOKEEPER_URL,5000,5000,new BytesPushThroughSerializer());      ServerData data = new ServerData("address"+i,i+"","name_"+i);      WorkServer server = new WorkServer(SERVERPATH,CONFIGPATH,data,config,zkq);      server.start();    }    new BufferedReader(new InputStreamReader(System.in)).readLine();  }}