前言:马上要过年了,祝大家新年快乐!在过年回家前分享一篇关于Zookeeper的文章,我们都知道现在微服务盛行,大数据、分布式系统中经常会使用到Zookeeper,它是微服务、分布式系统中必不可少的分布式协调框架。它的作用体现在分布式系统中解决了配置中心的问题,以及解决了在分布式环境中不同进程之间争夺资源的问题,也就是分布式锁的功能以及分布式消息队列功能等等。所以在微服务的环境中Zookeeper是现在很多公司首选的分布式协调框架,包括我之前的公司也在使用Zookeeper。说了这么多,没别的就是想说一下Zookeeper的重要性,废话不多说,进入正题。本篇博客只是演示在.Net Core 环境中如何使用Zookeeper组件进行基本的增删改查和一些注意的要点,如果对Zookeeper还不是太了解的话,建议认认真真、仔仔细细地阅读该文章:http://www.cnblogs.com/sunddenly/p/4033574.html   否则可能下面演示的你会看不懂。

 

一、Zookeeper基本概念快速介绍

概念:

Zookeeper是一个开源的分布式协调框架,它具有高性能 、高可用的特点,同时具有严格的顺序访问控制能力(主要是写操作的严格顺序性),基于对ZAB(Zookeeper原子消息广播协议)的实现,它能够很好的保证分布式环境下的数据一致性。也正是基于这样的特征,使得Zookeeper称为解决分布式数据一致性问题的利器,Zookeeper由两部分组成:Zookeeper服务端和客户端。

特点:

  • 全局一致性:每个server保存一份相同的数据副本,client无论链接哪个server,展示的数据都是一致的,这是最重要的特征。
  • 可靠性:如果消息其中一台服务器接受,那么将被所有的服务器接受。
  • 顺序性:包括全局有序性和偏序两种:全局有序是指如果在一台服务器上消息a在消息b前发布,则在所有server上消息a都将在消息b前被发布;偏序是指如果一个消息b在消息a后被同一个发送者发布,a必将排在b前面。
  • 数据更新原子性:一次数据更新要么成功,要么失败,不存在中间状态。
  • 实时性:Zookeeper保证客户端将在一个时间间隔范围内获得服务器的更新信息,或者服务器失败的信息。

数据结构:

图片来源:(https://www.cnblogs.com/xums/p/7074008.html)

  • Zookeeper的数据结构模型采用类似于文件系统的树结构。树上的每个节点称为ZNode,而每个节点都可能有一个或者多个子节点。ZNode的节点路径标识方式是由一系列斜杠”/”进行分割的路径表示,必须是绝对路径。既可以向ZNode节点写入、修改和读取数据,也可以创建、删除ZNode节点或ZNode节点下的子节点。
  • 值的注意的是,Zookeeper的设计目标不是传统的数据库存储或大数据对象存储,而是协同数据的存储,因此在实现的时候,ZNode存储的数据大小不应该超过1MB。另外,每一个节点都有一个ACL(访问控制列表),据此控制该节点的访问权限。
  • ZNode数据节点是有生命周期的,其生命周期的长短取决于数据节点的节点类型。节点类型共有四种:持久节点、持久顺序节点、临时节点、临时顺序节点

 

好了,基本的概念就聊到这里,先有一个印象,如果需要详细的学习,建议认认真真阅读这篇博客:http://www.cnblogs.com/sunddenly/p/4033574.html,下面就开始演示基本的api操作。

 

二、ASP.Net Core 中使用ZooKeeper

 首先,添加下面的依赖包:

 

新建一个.Net Core的控制台应用:

Zookeeper的服务端使用的是张辉清老师新书《中小研发团队架构实践》里面的服务,我这里不再安装Zookeeper服务端,只是介绍一下Zookeeper的目录结构

  • Zookeeper目录介绍

(1)bin:主要的一些运行命令

(2)conf:存放配置文件,其中我们需要修改zk.cfg

(3)contrib:附加的一些功能

(4)dist-maven:mvn编译后的目录

(5)docs:文档

(6)lib:需要依赖的jar包

配置文件zk.cfg文件内容介绍(单机版)

(1)trickTime:用于计算的时间单元,比如session超时:N*trickTime

(2)initLimit:用于集群,允许从节点链接并同步到master节点的初始化链接时间,以trickTime的倍数来表示

(3)syncLimit:用于集群,master主节点与从节点之间发送消息,请求和应答时间长度(心跳机制)

(4)dataDir:必须配置

(5)dataLogDir:日志目录,如果不配置会和dataDir公用

(6)clientPort:链接服务器的端口,默认是2181

好了就介绍到这里,下面让我会演示关于Zookeeper  API的各种操作。

  • 如何连接Zookeeper的服务端

(1)代码如下:

using org.apache.zookeeper;
using org.apache.zookeeper.data;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static org.apache.zookeeper.Watcher.Event;

namespace ZookeeperNetCore
{
    public class ZookeeperClient
    {
        public ZooKeeper ZK { get; set; }

        // 配置项
        public string QueryPath { get; set; }= "/Configuration";
        //节点状态信息
        public Stat Stat { get; set; }

        // 配置数据
        public byte[] ConfigData { get; set; } = null;


        public ZookeeperClient(string serviceAddress, int timeout)
        {
            ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher(this));

            Console.WriteLine("客户端开始连接zookeeper服务器...");
            Console.WriteLine($"连接状态:{ZK.getState()}");
            Thread.Sleep(1000);//注意:为什么要加上这行代码,如果不加会出现什么问题
            Console.WriteLine($"连接状态:{ZK.getState()}");
        }

        // 读取节点的配置数据
        public async Task<string> ReadConfigDataAsync()
        {
            if (this.ZK == null)
            {
                return string.Empty;
            }

            var stat = await ZK.existsAsync(QueryPath, true);

            if (stat == null)
            {
                return string.Empty;
            }

            this.Stat = stat;

            var dataResult = await ZK.getDataAsync(QueryPath, true);
            
            return Encoding.UTF8.GetString(dataResult.Data);
        }

        public class ConfigServiceWatcher : Watcher
        {
            private ZookeeperClient _cs = null;

            public ConfigServiceWatcher(ZookeeperClient cs)
            {
                _cs = cs;
            }

            public override  async Task process(WatchedEvent @event)
            {
                Console.WriteLine($"Zookeeper链接成功:{@event.getState() == KeeperState.SyncConnected}");

                if (@event.get_Type() == EventType.NodeDataChanged)
                {
                    var data = await _cs.ReadConfigDataAsync();

                    Console.WriteLine("{0}收到修改此节点【{1}】值的通知,其值已被改为【{2}】。", Environment.NewLine, _cs.QueryPath, data);
                }
            }
        }

    }
}

解释:

 首先,我们来看看创建Zookeeper对象时,应该注意的问题:

Zookeeper的构造函数参数解释如下:

客户端和zk服务端链接是一个异步的过程,当连接成功后后,客户端会收的一个watch通知,就是调用回调函数:ConfigServiceWatcher.process(WatchedEvent @event)注意这个类ConfigServiceWatcher必须要继承Watcher,重写 process(WatchedEvent @event),所以就打印出了。关于Zookeeper的watcher后面会详细介绍,不明白的不要紧,后面会通过代码给大家演示。

(1)connectString:连接服务器的ip字符串,比如: “192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181″可以是一个ip,也可以是多个ip,一个ip代表单机,多个ip代表集群,也可以在ip后加路径。

(2)sessionTimeout:超时时间,心跳收不到了,那就超时

(3)watcher:通知事件,如果有对应的事件触发,则会收到一个通知;如果不需要,那就设置为null,在上面的演示中,我们设置了一个watcher。

(4)canBeReadOnly:可读,当这个物理机节点断开后,还是可以读到数据的,只是不能写,此时数据被读取到的可能是旧数据,此处建议设置为false,不推荐使用。

(5)sessionId:会话的id

(6)sessionPasswd:会话密码 当会话丢失后,可以依据 sessionId 和 sessionPasswd 重新获取会话。

好了,基本的参数已经介绍完毕,那么,来解释一下为什么在创建Zookeeper对象时添加下面这句代码:

其实上面我已经解释了,由于客户端和zk服务端链接是一个异步的过程,需要一定的时间间隔,所以,如果不添加效果这样:

 

(2)zookeeper 恢复之前的会话连接演示

using org.apache.zookeeper;
using org.apache.zookeeper.data;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static org.apache.zookeeper.Watcher.Event;

namespace ZookeeperNetCore
{
    public class ZookeeperClient
    {
        public ZooKeeper ZK { get; set; }

        // 配置项
        public string QueryPath { get; set; }= "/Configuration";
        //节点状态信息
        public Stat Stat { get; set; }

        // 配置数据
        public byte[] ConfigData { get; set; } = null;


        public ZookeeperClient(string serviceAddress, int timeout)
        {
            ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher(this));

        }

        public ZookeeperClient(string serviceAddress, int timeout, long sessionId, byte[] sessionPasswd)
        {
            ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher2(this), sessionId, sessionPasswd);

        }

        // 读取节点的配置数据
        public async Task<string> ReadConfigDataAsync()
        {
            if (this.ZK == null)
            {
                return string.Empty;
            }

            var stat = await ZK.existsAsync(QueryPath, true);

            if (stat == null)
            {
                return string.Empty;
            }

            this.Stat = stat;

            var dataResult = await ZK.getDataAsync(QueryPath, true);
            
            return Encoding.UTF8.GetString(dataResult.Data);
        }

        public class ConfigServiceWatcher : Watcher
        {
            private ZookeeperClient _cs = null;

            public ConfigServiceWatcher(ZookeeperClient cs)
            {
                _cs = cs;
            }

            public override  async Task process(WatchedEvent @event)
            {
                Console.WriteLine($"Zookeeper链接成功:{@event.getState() == KeeperState.SyncConnected}");

                if (@event.get_Type() == EventType.NodeDataChanged)
                {
                    var data = await _cs.ReadConfigDataAsync();

                    Console.WriteLine("{0}收到修改此节点【{1}】值的通知,其值已被改为【{2}】。", Environment.NewLine, _cs.QueryPath, data);
                }
            }
        }

        public class ConfigServiceWatcher2 : Watcher
        {
            private ZookeeperClient _cs = null;

            public ConfigServiceWatcher2(ZookeeperClient cs)
            {
                _cs = cs;
            }

            public override async Task process(WatchedEvent @event)
            {
                Console.WriteLine($"Zookeeper链接成功:{@event.getState() == KeeperState.SyncConnected}");

                if (@event.get_Type() == EventType.NodeDataChanged)
                {
                    var data = await _cs.ReadConfigDataAsync();

                    Console.WriteLine("{0}收到修改此节点【{1}】值的通知,其值已被改为【{2}】。", Environment.NewLine, _cs.QueryPath, data);
                }
            }
        }
    }
}

 

  •  ZNode创建删除修改查询

代码:

using org.apache.zookeeper;
using org.apache.zookeeper.data;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static org.apache.zookeeper.Watcher.Event;
using static org.apache.zookeeper.ZooDefs;

namespace ZookeeperNetCore
{
    public class ZookeeperClient
    {
        public ZooKeeper ZK { get; set; }

        // 配置项
        public string QueryPath { get; set; }= "/Configuration";
        //节点状态信息
        public Stat Stat { get; set; }

        // 配置数据
        public byte[] ConfigData { get; set; } = null;


        public ZookeeperClient(string serviceAddress, int timeout)
        {
            ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher(this));

        }

        public ZookeeperClient(string serviceAddress, int timeout, long sessionId, byte[] sessionPasswd)
        {
            ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher2(this), sessionId, sessionPasswd);

        }

        // 读取节点的配置数据
        public async Task<string> ReadConfigDataAsync()
        {
            if (this.ZK == null)
            {
                return string.Empty;
            }

            var stat = await ZK.existsAsync(QueryPath, true);

            if (stat == null)
            {
                return string.Empty;
            }

            this.Stat = stat;

            var dataResult = await ZK.getDataAsync(QueryPath, true);
            
            return Encoding.UTF8.GetString(dataResult.Data);
        }

        

        public class ConfigServiceWatcher : Watcher
        {
            private ZookeeperClient _cs = null;

            public ConfigServiceWatcher(ZookeeperClient cs)
            {
                _cs = cs;
            }

            public override  async Task process(WatchedEvent @event)
            {
                Console.WriteLine($"Zookeeper链接成功:{@event.getState() == KeeperState.SyncConnected}");

                if (@event.get_Type() == EventType.NodeDataChanged)
                {
                    var data = await _cs.ReadConfigDataAsync();

                    Console.WriteLine("{0}收到修改此节点【{1}】值的通知,其值已被改为【{2}】。", Environment.NewLine, _cs.QueryPath, data);
                }
            }
        }

        public class ConfigServiceWatcher2 : Watcher
        {
            private ZookeeperClient _cs = null;

            public ConfigServiceWatcher2(ZookeeperClient cs)
            {
                _cs = cs;
            }

            public override async Task process(WatchedEvent @event)
            {
                Console.WriteLine($"Zookeeper链接成功:{@event.getState() == KeeperState.SyncConnected}");

                if (@event.get_Type() == EventType.NodeDataChanged)
                {
                    var data = await _cs.ReadConfigDataAsync();

                    Console.WriteLine("{0}收到修改此节点【{1}】值的通知,其值已被改为【{2}】。", Environment.NewLine, _cs.QueryPath, data);
                }
            }
        }

        // 关闭ZooKeeper连接
        // 释放资源
        public async Task Close()
        {
            if (this.ZK != null)
            {
               await ZK.closeAsync();
            }

            this.ZK = null;
        }


        
    }
}
using org.apache.zookeeper;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static org.apache.zookeeper.ZooDefs;

namespace ZookeeperNetCore
{
    class Program
    {
        public const int timeout = 5000;
        static async Task Main(string[] args)
        {
            var conf = new ZookeeperClient("", timeout);

            try
            {
                conf.QueryPath = "/UserName";

                Console.WriteLine("客户端开始连接zookeeper服务器...");
                Console.WriteLine($"连接状态:{conf.ZK.getState()}");
                Thread.Sleep(1000);//注意:为什么要加上这行代码,如果不加会出现什么问题
                Console.WriteLine($"连接状态:{conf.ZK.getState()}");

                if (await conf.ZK.existsAsync(conf.QueryPath, false) == null)
                {
                    conf.ConfigData = Encoding.Default.GetBytes("guozheng");
                    await conf.ZK.createAsync(conf.QueryPath, conf.ConfigData, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }

                string configData = await conf.ReadConfigDataAsync();
                Console.WriteLine("节点【{0}】目前的值为【{1}】。", conf.QueryPath, configData);
                Console.ReadLine();


                Random random = new Random((int)DateTime.Now.Ticks & 0x0000FFFF);
                conf.ConfigData = Encoding.UTF8.GetBytes(string.Format("Mike_{0}", random.Next(100)));

                await conf.ZK.setDataAsync(conf.QueryPath, conf.ConfigData, -1);

                Console.WriteLine("节点【{0}】的值已被修改为【{1}】。", conf.QueryPath, Encoding.UTF8.GetString(conf.ConfigData));

                Console.ReadLine();

                if (await conf.ZK.existsAsync(conf.QueryPath, false) != null)
                {
                    await conf.ZK.deleteAsync(conf.QueryPath, -1);

                    Console.WriteLine("已删除此【{0}】节点。{1}", conf.QueryPath, Environment.NewLine);
                }

            }
            catch (Exception ex)
            {
                if (conf.ZK == null)
                {
                    Console.WriteLine("已关闭ZooKeeper的连接。");
                    Console.ReadLine();
                    return;
                }

                Console.WriteLine("抛出异常:{0}【{1}】。", Environment.NewLine, ex.ToString());
            }
            finally
            {
                await conf.Close();
                Console.WriteLine("已关闭ZooKeeper的连接。");
                Console.ReadLine();
            }



            ////开始会话重连
            //Console.WriteLine("开始会话重连...");

            //var conf2 = new ZookeeperClient("", timeout, sessionId, sessionPassword);

            //Console.WriteLine(conf2.ZK.getSessionId());
            //Console.WriteLine( Encoding.UTF8.GetString(conf2.ZK.getSessionPasswd()));

            //Console.WriteLine($"重新连接状态zkSession:{conf2.ZK.getState()}");
            //Thread.Sleep(1000);//注意:为什么要加上这行代码,如果不加会出现什么问题
            //Console.WriteLine($"重新连接状态zkSession:{conf2.ZK.getState()}");


            Console.ReadKey();
        }
    }
}

 

 解释:

关于异步创建节点的方法,是不支持子节点的递归创建,参数介绍:

(1)path:创建的路径

(2)data:存储的数据的byte[]

(3)acl:控制权限策略   Ids.OPEN_ACL_UNSAFE –> world:anyone:cdrwa      CREATOR_ALL_ACL –> auth:user:password:cdrwa

(4)createMode: 节点类型, 是一个枚举    PERSISTENT:持久节点   PERSISTENT_SEQUENTIAL:持久顺序节点   EPHEMERAL:临时节点   EPHEMERAL_SEQUENTIAL:临时顺序节点

 关于上面参数引出来的知识点,需要几章来讲解,本篇文章先不介绍,后面会介绍。好了,关于.Net Core中使用Zookeeper的介绍就到这里,关于上面演示的结果,我先抛出一个问题,大家可以思考一下:为什么“Zookeeper链接成功:True”会输出多次?也就是我们下节要讨论的Zookeeper的watcher机制。时间到了,收拾行李,准备一下回家啦,先写到这里,祝大家新年快乐!希望对你有帮助,过完年来见!

 

三、总结

 可能有些地方解释的不是太清楚,大家多多见谅,有些的不对的地方,希望能指正出来。

说明:演示代码里面使用的Zookeeper服务过一段时间能用,不能用的话,在评论区留言,后面用阿里云自己搭建一个。

 代码地址:

 https://github.com/guozheng007/ZookeeperNetCoreDemo

 

 

参考资料:

(1)张辉清:《中小研发团队架构实践》

(2) 风间影月:《ZooKeeper分布式专题与Dubbo微服务入门》

 (3)sunddenly:http://www.cnblogs.com/sunddenly/p/4033574.html

作者:郭峥

出处:http://www.cnblogs.com/runningsmallguo/

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文链接。

版权声明:本文为runningsmallguo原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/runningsmallguo/p/10340343.html