v248575b089b95dca2cf29153621e9ab3b1200x500.jpg

Dubbo 通过注册中心在分布式环境中实现服务的注册与发现,而注册中心通常采用 ZooKeeper,研究注册中心相关源码绕不开 ZooKeeper,所以学习了 ZooKeeper 的基本概念以及相关 API 操作。

客户端与服务端采用 TCP 长连接,服务端在为客户端创建 Session 会分配一个唯一 sessionId。在 Session timeout 时间内,客户端可以向服务端发送请求以及接受 watcher 事件通知。

Zookeeper 将所有数据存储在内存中,数据模型是一棵树(Znode Tree),由斜杠(/)的进行分割的路径,就是一个Znode,例如/foo/path1。

null

Znode 将会保存数据内容以及相关属性信息。在 Znode 中使用 Stat 数据结保存相关属性信息。Stat 属性中有三种版本信息,分别为 version:当前节点版本信息,cversion:当前节点子节点版本,aversion 当前节点的 ACL 版本。每次发生改动,版本数值将会单调递增。

更新,删除 Znode 可以传入版本数值,如果版本数值不对,将会导致删除/更新失败,这个特性类似于 CAS 操作。

Znode 有以下几种类型:

  1. 永久节点

一旦创建,将会一直存在,除非手动删除。dubbo 目录节点为永久节点。

  1. 临时节点

临时节点基于客户端 Session,Session 有效期内将会一直存在,Session 失效,节点将会自动删除。

利用这个机制,Dubbo 服务者创建的节点就是临时节点。如果 Dubbo 服务者程序意外宕机,在 Session 超时之后,也能自动删除服务节点,自动下线有问题的服务。

3 顺序节点

创建顺序节点将会自动在名字后追加整形数字,默认长度为 10 位。顺序节点也分为永久与临时。

利用临时顺序节点,我们可以用来实现分布式锁 七张图彻底讲清楚ZooKeeper分布式锁的实现原理【石杉的架构笔记】

客户端可以在指定节点注册监听器(Watcher),在触发特定事件后,ZooKeeper 服务端会将事件通知到客户端。在 Dubbo 中消费者基于 watcher 机制可以动态感知到新的服务者加入。

ZooKeeper 可以在三种请求中设置监听,分别为:

  • getData(),获取节点数据
  • getChildren() 获取子节点
  • exists() 判断节点是否存在

通知事件类型分为,增删改事件,以及子节点变动事件。

需要注意的是,watcher 通知过一次之后将会失效,若想继续监听通知,需要重新注册。

ZooKeeper 官方提供 Java API 实现,提供相关操作的方法。

  1. ZooKeeper zk=new ZooKeeper("127.0.0.1:2181", 150000, new Watcher() {
  2. @Override
  3. public void process(WatchedEvent watchedEvent) {
  4. System.out.println("已经触发了" + watchedEvent.getType() + "事件"+watchedEvent);
  5. }
  6. });

创建连接需要传入 ZooKeeper 服务端地址,然后设定 session 超时时间,另外还需要创建一个 Watcher,用于监听连接事件。建立连接之后,就可以使用该客户端操作。

  1. // 创建永久节点,需要传入 ACL 权限列表,以及指定节点类型
  2. zk.create("/test","test".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL,CreateMode.PERSISTENT);
  3. // 修改节点值。更新节点值需要传入节点的版本,如果版本与服务端版本不一致,更新失败,类似 CAS 机制。-1 代表不比较节点版本
  4. zk.setData("/test","test1".getBytes(),-1);
  5. // 删除节点.删除节点也需要传入节点版本
  6. zk.delete("/test",-1);
  7. // 创建临时节点
  8. zk.create("/ephemeral","ephemeral".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL,CreateMode.EPHEMERAL);

ZooKeeper 客户端相关 CRUD 操作如上。可以看到相关操作比较繁琐,需要传入参数较多。

  1. // 在 exists 注册 watcher,创建节点,删除节点,改变节点将会触发回调
  2. zk.exists("/test", new Watcher() {
  3. @Override
  4. public void process(WatchedEvent event) {
  5. System.out.println("回调实例,类型为:"+event.getType());
  6. }
  7. });
  8. // 获取节点数据,可以注册 watcher,删除节点以及改变节点数据可以触发回调
  9. zk.getData("/test", new Watcher() {
  10. @Override
  11. public void process(WatchedEvent event) {
  12. System.out.println("回调实例,类型为:"+event.getType());
  13. }
  14. },new Stat());
  15. // 获取子节点,注册 watcher,一级子节点变动后将会触发回调
  16. zk.getChildren("/test", new Watcher() {
  17. @Override
  18. public void process(WatchedEvent event) {
  19. System.out.println("回调实例,类型为:"+event.getType());
  20. }
  21. });

ZooKeeper API 可以为三种操作注册 watcher,一旦相关节点变动将会触发事件通知。

从上面代码示例可以看到 ZooKeeper 提供 API 比较复杂且难用。可以使用 Curator 或者 zkclient 这种第三方框架代替原生 API。这类框架封装 ZooKeeper 原生 API,抽象化相关接口,简化操作难度。

dubbo 抽象相关 ZooKeeper 操作,并分别使用 Curator 或者 zkclien 实现。在 dubbo 2.6.1 版本之后将会默认使用 Curator,之前版本默认使用 zkclient 。

下面我们使用 Curator 操作 ZooKeeper 。

  1. // 设置重试策略
  2. RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
  3. // 默认 session 超时时间 60 s
  4. CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);

Curator 创建连接与原生 API 大致相关,不过需要设置重试策略,第一次连接失败,Curator 可以重新尝试连接,直到超过最大连接次数。

  1. // 创建目录节点
  2. client.create().forPath("/test", "123456789".getBytes());
  3. // 创建普通节点
  4. client.create().forPath("/test/normal", "123121".getBytes());
  5. // 修改普通节点内容
  6. client.setData().forPath("/test/normal", "1121231231".getBytes());
  7. // 删除节点
  8. client.delete().forPath("/test/normal");
  9. // 创建临时节点
  10. client.create().withMode(CreateMode.EPHEMERAL).forPath("/ephemeral");
  11. // 创建永久顺序节点
  12. client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/sequential");
  13. // 获取所有子节点
  14. List<String> nodes = client.getChildren().forPath("/parent");

Curator CRUD 操作比较简单,无需设置相关属性参数。

Curator 相关监听 API 封装 zookeeper 原生API,内部增加重复注册等功能,从而使监听可以重复使用。

Curator 存在三种类型 API。

  • NodeCache:针对节点增删改操作。
  • PathChildrenCache:针对节点一级目录下节点增删改监听
  • TreeCache:结合 NodeCachePathChildrenCache 操作,不仅可以监听当前节点,还可以监听节点下任意子节点(支持多级)变动。
  1. // `NodeCache`使用方式
  2. NodeCache nodeCache=new NodeCache(client,"/test1",false);
  3. nodeCache.getListenable().addListener(new NodeCacheListener() {
  4. @Override
  5. public void nodeChanged() throws Exception {
  6. System.out.println("当前节点:"+nodeCache.getCurrentData());
  7. }
  8. });
  9. nodeCache.start();
  10. // PathChildrenCache 使用方式
  11. PathChildrenCache pathChildrenCache=new PathChildrenCache(client,"/test2",false);
  12. pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
  13. @Override
  14. public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
  15. System.out.println(event);
  16. }
  17. });
  18. pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
  19. System.out.println("注册watcher成功...");
  20. // TreeCache 使用方式
  21. TreeCache treeCache=new TreeCache(client,"/tree");
  22. treeCache.getListenable().addListener(new TreeCacheListener() {
  23. @Override
  24. public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
  25. System.out.println(event);
  26. }
  27. });
  28. treeCache.start();
  29. System.out.println("注册watcher成功...");

其他平台.png

版权声明:本文为goodAndyxublog原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/goodAndyxublog/p/11402280.html