Zookeeper系列四:Zookeeper在大型分布式系统中的应用、Zookeeper实现分布式锁
一、Zookeeper在大型分布式系统中的应用
1. 有些分布式系统是master-slave模式的,master是一个单节点,一旦master挂掉了整个集群就挂掉了,所以一般master都会有一个备份master-back,一旦master挂掉了,备份master就会顶上去
那么ZK是如何实现的呢?
前置条件:
统一的一个临时节点:TemporaryNode(/ds/TemporaryNode仅仅这样一个节点)
第一步:zk有这样一个持久节点/ds
第二步:master1和master2同时启动,同时向/ds这个持久节点申请创建临时子节点TemporaryNode(同一时间只有一个请求能够创建成功)。
如果master1创建成功,这个节点(TemporaryNode)就不允许master2创建(锁的机制)
master1的状态变为active,真正的master。路径:/ds/TemporaryNode
master2的状态变为standby(master-back)。
master2同时对节点/ds/TemporaryNode注册事件监听。
第三步:master1挂掉或者超过一定时间没有响应。TemporaryNode节点会被删除(master2注册的事件机制就会起作用),就会通知master2,master2就会创建临时节点/ds/TemporaryNode,同时修改状态为active。
备注:
假如master1并没有挂掉,只有由于网络延时导致,当网络顺畅的时候就会出现“脑裂”状态。都认为自己是active。出现两个master
解决脑裂的办法:对/ds/TemporaryNode加一个权限ACL控制(节点删除以后,权限同时也不在了)。master1对于这个节点/ds/TemporaryNode没有权限。自己把状态改成standby。
实际的案例1:Hadoop(NameNode、ResourceManager),普通的部署NameNode、ResourceManager仅仅是单节点。Hadoop HA(NameNode和ResourceManager有多个备份)
二、Zookeeper实现分布式锁
分布式锁主要用于在分布式环境中包括跨主机、跨进程、跨网络,导致共享资源不一致的问题,保证数据的一致性。
1. 分布式锁的实现思路
说明:
这种实现会有一个缺点,即当有很多进程在等待锁的时候,在释放锁的时候会有很多进程就过来争夺锁,这种现象称为 “惊群效应”
2. 分布式锁优化后的实现思路
3. Zookeeper分布式锁的代码实现
准备工作:
1)安装Zookeeper,具体参考我前面的我文章Zookeeper系列一:Zookeeper介绍、Zookeeper安装配置、ZK Shell的使用
2)新建一个maven项目ZK-Demo,然后在pom.xml里面引入相关的依赖
- <dependency>
- <groupId>com.101tec</groupId>
- <artifactId>zkclient</artifactId>
- <version>0.10</version>
- </dependency>
3.1 Zookeeper分布式锁的核心代码实现
实现逻辑参考“2. 分布式锁优化后的实现思路”中的流程图
- package com.study.demo.lock;
- import java.util.Collections;
- import java.util.List;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.Lock;
- import org.I0Itec.zkclient.IZkDataListener;
- import org.I0Itec.zkclient.ZkClient;
- import org.I0Itec.zkclient.serialize.SerializableSerializer;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- /**
- *
- * @Description: Zookeeper分布式锁的核心代码实现
- * @author leeSmall
- * @date 2018年9月4日
- *
- */
- public class DistributedLock implements Lock {
- private static Logger logger = LoggerFactory.getLogger(DistributedLock.class);
- private static final String ZOOKEEPER_IP_PORT = "192.168.152.130:2181";
- private static final String LOCK_PATH = "/LOCK";
- private ZkClient client = new ZkClient(ZOOKEEPER_IP_PORT, 4000, 4000, new SerializableSerializer());
- private CountDownLatch cdl;
- private String beforePath;// 当前请求的节点前一个节点
- private String currentPath;// 当前请求的节点
- // 判断有没有LOCK目录,没有则创建
- public DistributedLock() {
- if (!this.client.exists(LOCK_PATH)) {
- this.client.createPersistent(LOCK_PATH);
- }
- }
- public void lock() {
- //尝试去获取分布式锁失败
- if (!tryLock()) {
- //对次小节点进行监听
- waitForLock();
- lock();
- }
- else {
- logger.info(Thread.currentThread().getName() + " 获得分布式锁!");
- }
- }
- public boolean tryLock() {
- // 如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
- if (currentPath == null || currentPath.length() <= 0) {
- // 创建一个临时顺序节点
- currentPath = this.client.createEphemeralSequential(LOCK_PATH + '/', "lock");
- System.out.println("---------------------------->" + currentPath);
- }
- // 获取所有临时节点并排序,临时节点名称为自增长的字符串如:0000000400
- List<String> childrens = this.client.getChildren(LOCK_PATH);
- //由小到大排序所有子节点
- Collections.sort(childrens);
- //判断创建的子节点/LOCK/Node-n是否最小,即currentPath,如果当前节点等于childrens中的最小的一个就占用锁
- if (currentPath.equals(LOCK_PATH + '/' + childrens.get(0))) {
- return true;
- }
- //找出比创建的临时顺序节子节点/LOCK/Node-n次小的节点,并赋值给beforePath
- else {
- int wz = Collections.binarySearch(childrens, currentPath.substring(6));
- beforePath = LOCK_PATH + '/' + childrens.get(wz - 1);
- }
- return false;
- }
- //等待锁,对次小节点进行监听
- private void waitForLock() {
- IZkDataListener listener = new IZkDataListener() {
- public void handleDataDeleted(String dataPath) throws Exception {
- logger.info(Thread.currentThread().getName() + ":捕获到DataDelete事件!---------------------------");
- if (cdl != null) {
- cdl.countDown();
- }
- }
- public void handleDataChange(String dataPath, Object data) throws Exception {
- }
- };
- // 对次小节点进行监听,即beforePath-给排在前面的的节点增加数据删除的watcher
- this.client.subscribeDataChanges(beforePath, listener);
- if (this.client.exists(beforePath)) {
- cdl = new CountDownLatch(1);
- try {
- cdl.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- this.client.unsubscribeDataChanges(beforePath, listener);
- }
- //完成业务逻辑以后释放锁
- public void unlock() {
- // 删除当前临时节点
- client.delete(currentPath);
- }
- // ==========================================
- public void lockInterruptibly() throws InterruptedException {
- }
- public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
- return false;
- }
- public Condition newCondition() {
- return null;
- }
- }
3.2 在业务里面使用分布式锁
- package com.study.demo.lock;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.locks.Lock;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- /**
- *
- * @Description: 在业务里面使用分布式锁
- * @author leeSmall
- * @date 2018年9月4日
- *
- */
- public class OrderServiceImpl implements Runnable {
- private static OrderCodeGenerator ong = new OrderCodeGenerator();
- private Logger logger = LoggerFactory.getLogger(OrderServiceImpl.class);
- // 同时并发的线程数
- private static final int NUM = 10;
- // 按照线程数初始化倒计数器,倒计数器
- private static CountDownLatch cdl = new CountDownLatch(NUM);
- private Lock lock = new DistributedLock();
- // 创建订单接口
- public void createOrder() {
- String orderCode = null;
- //准备获取锁
- lock.lock();
- try {
- // 获取订单编号
- orderCode = ong.getOrderCode();
- } catch (Exception e) {
- // TODO: handle exception
- } finally {
- //完成业务逻辑以后释放锁
- lock.unlock();
- }
- // ……业务代码
- logger.info("insert into DB使用id:=======================>" + orderCode);
- }
- public void run() {
- try {
- // 等待其他线程初始化
- cdl.await();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- // 创建订单
- createOrder();
- }
- public static void main(String[] args) {
- for (int i = 1; i <= NUM; i++) {
- // 按照线程数迭代实例化线程
- new Thread(new OrderServiceImpl()).start();
- // 创建一个线程,倒计数器减1
- cdl.countDown();
- }
- }
- }
- package com.study.demo.lock;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- public class OrderCodeGenerator {
- // 自增长序列
- private static int i = 0;
- // 按照“年-月-日-小时-分钟-秒-自增长序列”的规则生成订单编号
- public String getOrderCode() {
- Date now = new Date();
- SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
- return sdf.format(now) + ++i;
- }
- }