1. 有些分布式系统是master-slave模式的,master是一个单节点,一旦master挂掉了整个集群就挂掉了,所以一般master都会有一个备份master-back,一旦master挂掉了,备份master就会顶上去

那么ZK是如何实现的呢?

前置条件:

统一的一个临时节点:TemporaryNode(/ds/TemporaryNode仅仅这样一个节点)

第一步:zk有这样一个持久节点/ds

第二步:master1和master2同时启动,同时向/ds这个持久节点申请创建临时子节点TemporaryNode(同一时间只有一个请求能够创建成功)。

如果master1创建成功,这个节点(TemporaryNode)就不允许master2创建(锁的机制)

master1的状态变为active,真正的master。路径:/ds/TemporaryNode

master2的状态变为standby(master-back)。

master2同时对节点/ds/TemporaryNode注册事件监听。

第三步:master1挂掉或者超过一定时间没有响应。TemporaryNode节点会被删除(master2注册的事件机制就会起作用),就会通知master2,master2就会创建临时节点/ds/TemporaryNode,同时修改状态为active。

备注:

假如master1并没有挂掉,只有由于网络延时导致,当网络顺畅的时候就会出现“脑裂”状态。都认为自己是active。出现两个master

解决脑裂的办法:对/ds/TemporaryNode加一个权限ACL控制(节点删除以后,权限同时也不在了)。master1对于这个节点/ds/TemporaryNode没有权限。自己把状态改成standby。

实际的案例1:Hadoop(NameNode、ResourceManager),普通的部署NameNode、ResourceManager仅仅是单节点。Hadoop HA(NameNode和ResourceManager有多个备份)

分布式锁主要用于在分布式环境中包括跨主机、跨进程、跨网络,导致共享资源不一致的问题,保证数据的一致性。

说明:

这种实现会有一个缺点,即当有很多进程在等待锁的时候,在释放锁的时候会有很多进程就过来争夺锁,这种现象称为 “惊群效应”

 

准备工作:

1)安装Zookeeper,具体参考我前面的我文章Zookeeper系列一:Zookeeper介绍、Zookeeper安装配置、ZK Shell的使用

2)新建一个maven项目ZK-Demo,然后在pom.xml里面引入相关的依赖

  1. <dependency>
  2. <groupId>com.101tec</groupId>
  3. <artifactId>zkclient</artifactId>
  4. <version>0.10</version>
  5. </dependency>

实现逻辑参考“2. 分布式锁优化后的实现思路”中的流程图

  1. package com.study.demo.lock;
  2. import java.util.Collections;
  3. import java.util.List;
  4. import java.util.concurrent.CountDownLatch;
  5. import java.util.concurrent.TimeUnit;
  6. import java.util.concurrent.locks.Condition;
  7. import java.util.concurrent.locks.Lock;
  8. import org.I0Itec.zkclient.IZkDataListener;
  9. import org.I0Itec.zkclient.ZkClient;
  10. import org.I0Itec.zkclient.serialize.SerializableSerializer;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. /**
  14. *
  15. * @Description: Zookeeper分布式锁的核心代码实现
  16. * @author leeSmall
  17. * @date 2018年9月4日
  18. *
  19. */
  20. public class DistributedLock implements Lock {
  21. private static Logger logger = LoggerFactory.getLogger(DistributedLock.class);
  22. private static final String ZOOKEEPER_IP_PORT = "192.168.152.130:2181";
  23. private static final String LOCK_PATH = "/LOCK";
  24. private ZkClient client = new ZkClient(ZOOKEEPER_IP_PORT, 4000, 4000, new SerializableSerializer());
  25. private CountDownLatch cdl;
  26. private String beforePath;// 当前请求的节点前一个节点
  27. private String currentPath;// 当前请求的节点
  28. // 判断有没有LOCK目录,没有则创建
  29. public DistributedLock() {
  30. if (!this.client.exists(LOCK_PATH)) {
  31. this.client.createPersistent(LOCK_PATH);
  32. }
  33. }
  34. public void lock() {
  35. //尝试去获取分布式锁失败
  36. if (!tryLock()) {
  37. //对次小节点进行监听
  38. waitForLock();
  39. lock();
  40. }
  41. else {
  42. logger.info(Thread.currentThread().getName() + " 获得分布式锁!");
  43. }
  44. }
  45. public boolean tryLock() {
  46. // 如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
  47. if (currentPath == null || currentPath.length() <= 0) {
  48. // 创建一个临时顺序节点
  49. currentPath = this.client.createEphemeralSequential(LOCK_PATH + '/', "lock");
  50. System.out.println("---------------------------->" + currentPath);
  51. }
  52. // 获取所有临时节点并排序,临时节点名称为自增长的字符串如:0000000400
  53. List<String> childrens = this.client.getChildren(LOCK_PATH);
  54. //由小到大排序所有子节点
  55. Collections.sort(childrens);
  56. //判断创建的子节点/LOCK/Node-n是否最小,即currentPath,如果当前节点等于childrens中的最小的一个就占用锁
  57. if (currentPath.equals(LOCK_PATH + '/' + childrens.get(0))) {
  58. return true;
  59. }
  60. //找出比创建的临时顺序节子节点/LOCK/Node-n次小的节点,并赋值给beforePath
  61. else {
  62. int wz = Collections.binarySearch(childrens, currentPath.substring(6));
  63. beforePath = LOCK_PATH + '/' + childrens.get(wz - 1);
  64. }
  65. return false;
  66. }
  67. //等待锁,对次小节点进行监听
  68. private void waitForLock() {
  69. IZkDataListener listener = new IZkDataListener() {
  70. public void handleDataDeleted(String dataPath) throws Exception {
  71. logger.info(Thread.currentThread().getName() + ":捕获到DataDelete事件!---------------------------");
  72. if (cdl != null) {
  73. cdl.countDown();
  74. }
  75. }
  76. public void handleDataChange(String dataPath, Object data) throws Exception {
  77. }
  78. };
  79. // 对次小节点进行监听,即beforePath-给排在前面的的节点增加数据删除的watcher
  80. this.client.subscribeDataChanges(beforePath, listener);
  81. if (this.client.exists(beforePath)) {
  82. cdl = new CountDownLatch(1);
  83. try {
  84. cdl.await();
  85. } catch (InterruptedException e) {
  86. e.printStackTrace();
  87. }
  88. }
  89. this.client.unsubscribeDataChanges(beforePath, listener);
  90. }
  91. //完成业务逻辑以后释放锁
  92. public void unlock() {
  93. // 删除当前临时节点
  94. client.delete(currentPath);
  95. }
  96. // ==========================================
  97. public void lockInterruptibly() throws InterruptedException {
  98. }
  99. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  100. return false;
  101. }
  102. public Condition newCondition() {
  103. return null;
  104. }
  105. }
  1. package com.study.demo.lock;
  2. import java.util.concurrent.CountDownLatch;
  3. import java.util.concurrent.locks.Lock;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. /**
  7. *
  8. * @Description: 在业务里面使用分布式锁
  9. * @author leeSmall
  10. * @date 2018年9月4日
  11. *
  12. */
  13. public class OrderServiceImpl implements Runnable {
  14. private static OrderCodeGenerator ong = new OrderCodeGenerator();
  15. private Logger logger = LoggerFactory.getLogger(OrderServiceImpl.class);
  16. // 同时并发的线程数
  17. private static final int NUM = 10;
  18. // 按照线程数初始化倒计数器,倒计数器
  19. private static CountDownLatch cdl = new CountDownLatch(NUM);
  20. private Lock lock = new DistributedLock();
  21. // 创建订单接口
  22. public void createOrder() {
  23. String orderCode = null;
  24. //准备获取锁
  25. lock.lock();
  26. try {
  27. // 获取订单编号
  28. orderCode = ong.getOrderCode();
  29. } catch (Exception e) {
  30. // TODO: handle exception
  31. } finally {
  32. //完成业务逻辑以后释放锁
  33. lock.unlock();
  34. }
  35. // ……业务代码
  36. logger.info("insert into DB使用id:=======================>" + orderCode);
  37. }
  38. public void run() {
  39. try {
  40. // 等待其他线程初始化
  41. cdl.await();
  42. } catch (InterruptedException e) {
  43. // TODO Auto-generated catch block
  44. e.printStackTrace();
  45. }
  46. // 创建订单
  47. createOrder();
  48. }
  49. public static void main(String[] args) {
  50. for (int i = 1; i <= NUM; i++) {
  51. // 按照线程数迭代实例化线程
  52. new Thread(new OrderServiceImpl()).start();
  53. // 创建一个线程,倒计数器减1
  54. cdl.countDown();
  55. }
  56. }
  57. }
  1. package com.study.demo.lock;
  2. import java.text.SimpleDateFormat;
  3. import java.util.Date;
  4. public class OrderCodeGenerator {
  5. // 自增长序列
  6. private static int i = 0;
  7. // 按照“年-月-日-小时-分钟-秒-自增长序列”的规则生成订单编号
  8. public String getOrderCode() {
  9. Date now = new Date();
  10. SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
  11. return sdf.format(now) + ++i;
  12. }
  13. }

 

版权声明:本文为leeSmall原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/leeSmall/p/9588533.html