[TOC]


背景

平台支持对服务的安装,更新,启停,备份,卸载,查询等操作,由平台发起命令,agent执行任务,由于agent是多线程并发处理任务,试想一下,多用户对主机上同一服务的进行操作,例如同时卸载和备份,会出现什么可怕的情况。老版本的所有操作,都是人工通过简单if else状态判断,没有从根本上发现问题的本质,并且耦合了业务属性,没有对其进行足够的抽象。

主要名词解释

英文名称或缩写
curator
zookeeper
平台
agent

方案设计

平台锁提供了两种实现方式,一种是基于jdk原生的ReentrantLock,另一种是基于zookeeper的分布式锁,两种方式更有优劣,原生方式的好处是不需要引入第三方组件,由自己在内存中维护锁,zookeeper好处是如果未来平台支持集群,那么锁服务可以无缝衔接。

zookeeper分布式锁

zookeeper分布式锁实现可以参考《从Paxos到Zookeeper分布式一致性原理与实现》里面的分布式锁,也可以直接用curator,封装的很好,里面有实现。

排它锁(截图来自从Paxos到zookeeper)

这里写图片描述

共享锁 (截图来自从Paxos到zookeeper)

这里写图片描述

这里写图片描述

锁的设计是针对一个项目下单台机器上的单台服务,这样既能保证锁的细度,又能保证agent并发操作多服务。锁的路径标记形如“/projectId/sericeId/serviceName”的形式(模仿zookeeper的树形结构), 通常情况下 projectId和serviceId就能确定一个唯一服务,加上serviceName可以作为调试信息。锁路径的生成由统一的构造器LockPathBuilder生成,保证形式统一。

ReentrantServiceLock由自己编写一个key:value 的map,来维护锁的信息,zookeeper则不需要为此考虑。

类设计

锁的设计采用工厂方法设计模式,通过工厂接口和锁接口面向接口开发,可方便扩展其他锁的实现方式,利用统一的LockPathBuilder来生成锁的路径,保证路径的规范统一,借助于javaSPI机制,可以在不修改代码的情况下替换应用默认的锁实现方式。

这里写图片描述

流程图

  • 构建当前操作服务的锁的路径
  • 获取锁
  • 上锁
  • 对agent发命令进行操作
  • 解锁
    这里写图片描述

使用实例

        String lockPath = LockPathBuilder.newLock().withProjectId(service.getProjectId())
                .withServiceName(service.getAbbreviation())
                .withServiceId(serviceId)
                .build();   //lockPath创建

        //删除备份所需要的参数
        param.put("serviceName", service.getAbbreviation());
        param.put("backupPath", path);
        ServiceLock serviceLock = SERVICE_LOCK_FACTORY.getLock(lockPath);//获取此服务锁
        serviceLock.lock();  //上锁
        //对此agent发送命令
        Future future = agentService.synPushTask(ip, ScriptConstant.remove_service_backup, param, UUID.randomUUID().toString(), AgentResultMsg.class);
        AgentResultMsg result = null;
        try {
            result = (AgentResultMsg) future.get(20, TimeUnit.SECONDS);//等待结果时间
        } catch (TimeoutException e) {
            restResult.setSuccess(false);
            restResult.setMessage("删除备份超时");
        } catch (Exception e) {
           logger.error(e);
        } finally {
            //释放锁
            serviceLock.unlock();
        }

上述过程有个隐含的步骤,那就是一定要有超时机制,不然agent挂掉,一直不返回数据,会造成死锁,平台在这一版本中支持了同步调用agent,保证能在有限的时间内完成调用,并在同一个线程中释放锁。

关于同步调用如何实现会在以后的案例中分析。

测试用例

/**
 * Created by liuroy on 2017/9/25.
 */
public class ReentrantServiceLockTest {

    @Test
    public void testServiceLock() {
        ServiceLockFactory lockFactory = SpiServiceFactory.getService(ServiceLockFactory.class);
        Runnable task1 = new Runnable() {
            public void run() {
                ServiceLock lock = null;
                try {
                    lock = lockFactory.getLock("test1");
                    lock.lock();
                    System.out.println("Thread " + Thread.currentThread().getId() + " getted lock");
                    Thread.sleep(3000);
                    System.out.println("Thread " + Thread.currentThread().getId() + " running");
                    System.out.println("Thread " + Thread.currentThread().getId() + " run oven");
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if (lock != null)
                        lock.unlock();
                }
            }
        };

        new Thread(task1).start();
        Runnable task2 = new Runnable() {
            public void run() {
                ServiceLock lock = null;
                try {
                    lock = lockFactory.getLock("test1");
                    if (!lock.tryLock(2000, TimeUnit.MILLISECONDS)){
                        System.out.println("Thread " + Thread.currentThread().getId() + " unget lock");
                    }
                    lock.lock();
                    System.out.println("Thread " + Thread.currentThread().getId() + " getted lock");
                    System.out.println("Thread " + Thread.currentThread().getId() + " running");
                    Thread.sleep(3000);
                    System.out.println("Thread " + Thread.currentThread().getId() + " run oven");
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if (lock != null)
                        lock.unlock();
                }
            }
        };

        Runnable task3 = new Runnable() {
            public void run() {
                ServiceLock lock = null;
                try {
                    lock = lockFactory.getLock("test1");
                    lock.lock();
                    System.out.println("Thread " + Thread.currentThread().getId() + " getted lock");
                    System.out.println("Thread " + Thread.currentThread().getId() + " running");
                    Thread.sleep(4000);
                    System.out.println("Thread " + Thread.currentThread().getId() + " run oven");
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if (lock != null)
                        lock.unlock();
                }
            }
        };

        new Thread(task2).start();
        new Thread(task3).start();


        try {
            Thread.sleep(20000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
    }
}
/**
 * Created by liuroy on 2017/9/25.
 */
public class ZookeeperServiceLockTest {

    private  ServiceLockFactory lockFactory;

    @Before
    public void init() {
        lockFactory = new ZookeeperServiceLockFactory();
    }

    @Test
    public void testServiceLock() {
        Runnable task1 = new Runnable() {
            public void run() {
                ServiceLock lock = null;
                try {
                    lock = lockFactory.getLock("testmmm/test");
                    lock.lock();
                    System.out.println("Thread1 getted lock");
                    Thread.sleep(3000);
                    System.out.println("Thread1 running");
                    System.out.println("Thread1 run oven");
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if (lock != null)
                        lock.unlock();
                }
            }
        };

        new Thread(task1).start();
        try {
            Thread.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Runnable task2 = new Runnable() {
            public void run() {
                ServiceLock lock = null;
                try {
                    lock = lockFactory.getLock("testmmm/test");
                    if (!lock.tryLock(2000, TimeUnit.MILLISECONDS)){
                        System.out.println("Thread2 unget lock");
                    }
                    lock.lock();
                    System.out.println("Thread2 getted lock");
                    System.out.println("Thread2 running");
                    Thread.sleep(3000);
                    System.out.println("Thread2 run oven");
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if (lock != null)
                        lock.unlock();
                }
            }
        };

        Runnable task3 = new Runnable() {
            public void run() {
                ServiceLock lock = null;
                try {
                    lock = lockFactory.getLock("testmmm/test");
                    lock.lock();
                    System.out.println("Thread3 getted lock");
                    System.out.println("Thread3 running");
                    Thread.sleep(4000);
                    System.out.println("Thread3 run oven");
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if (lock != null)
                        lock.unlock();
                }
            }
        };
        new Thread(task3).start();
        try {
            Thread.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(task2).start();



        try {
            Thread.sleep(20000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
    }

    @After
    public void fini() {
    }
}

版权声明:本文为liuroy原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:http://www.cnblogs.com/liuroy/p/8029366.html