部署平台分布式锁设计
[TOC]
背景
平台支持对服务的安装,更新,启停,备份,卸载,查询等操作,由平台发起命令,agent执行任务,由于agent是多线程并发处理任务,试想一下,多用户对主机上同一服务的进行操作,例如同时卸载和备份,会出现什么可怕的情况。老版本的所有操作,都是人工通过简单if else状态判断,没有从根本上发现问题的本质,并且耦合了业务属性,没有对其进行足够的抽象。
主要名词解释
英文名称或缩写 |
---|
curator |
zookeeper |
平台 |
agent |
方案设计
平台锁提供了两种实现方式,一种是基于jdk原生的ReentrantLock,另一种是基于zookeeper的分布式锁,两种方式更有优劣,原生方式的好处是不需要引入第三方组件,由自己在内存中维护锁,zookeeper好处是如果未来平台支持集群,那么锁服务可以无缝衔接。
zookeeper分布式锁
zookeeper分布式锁实现可以参考《从Paxos到Zookeeper分布式一致性原理与实现》里面的分布式锁,也可以直接用curator,封装的很好,里面有实现。
排它锁(截图来自从Paxos到zookeeper)
共享锁 (截图来自从Paxos到zookeeper)
锁的设计是针对一个项目下单台机器上的单台服务,这样既能保证锁的细度,又能保证agent并发操作多服务。锁的路径标记形如“/projectId/sericeId/serviceName”的形式(模仿zookeeper的树形结构), 通常情况下 projectId和serviceId就能确定一个唯一服务,加上serviceName可以作为调试信息。锁路径的生成由统一的构造器LockPathBuilder生成,保证形式统一。
ReentrantServiceLock由自己编写一个key:value 的map,来维护锁的信息,zookeeper则不需要为此考虑。
类设计
锁的设计采用工厂方法设计模式,通过工厂接口和锁接口面向接口开发,可方便扩展其他锁的实现方式,利用统一的LockPathBuilder来生成锁的路径,保证路径的规范统一,借助于javaSPI机制,可以在不修改代码的情况下替换应用默认的锁实现方式。
流程图
- 构建当前操作服务的锁的路径
- 获取锁
- 上锁
- 对agent发命令进行操作
- 解锁
使用实例
String lockPath = LockPathBuilder.newLock().withProjectId(service.getProjectId())
.withServiceName(service.getAbbreviation())
.withServiceId(serviceId)
.build(); //lockPath创建
//删除备份所需要的参数
param.put("serviceName", service.getAbbreviation());
param.put("backupPath", path);
ServiceLock serviceLock = SERVICE_LOCK_FACTORY.getLock(lockPath);//获取此服务锁
serviceLock.lock(); //上锁
//对此agent发送命令
Future future = agentService.synPushTask(ip, ScriptConstant.remove_service_backup, param, UUID.randomUUID().toString(), AgentResultMsg.class);
AgentResultMsg result = null;
try {
result = (AgentResultMsg) future.get(20, TimeUnit.SECONDS);//等待结果时间
} catch (TimeoutException e) {
restResult.setSuccess(false);
restResult.setMessage("删除备份超时");
} catch (Exception e) {
logger.error(e);
} finally {
//释放锁
serviceLock.unlock();
}
上述过程有个隐含的步骤,那就是一定要有超时机制,不然agent挂掉,一直不返回数据,会造成死锁,平台在这一版本中支持了同步调用agent,保证能在有限的时间内完成调用,并在同一个线程中释放锁。
关于同步调用如何实现会在以后的案例中分析。
测试用例
/**
* Created by liuroy on 2017/9/25.
*/
public class ReentrantServiceLockTest {
@Test
public void testServiceLock() {
ServiceLockFactory lockFactory = SpiServiceFactory.getService(ServiceLockFactory.class);
Runnable task1 = new Runnable() {
public void run() {
ServiceLock lock = null;
try {
lock = lockFactory.getLock("test1");
lock.lock();
System.out.println("Thread " + Thread.currentThread().getId() + " getted lock");
Thread.sleep(3000);
System.out.println("Thread " + Thread.currentThread().getId() + " running");
System.out.println("Thread " + Thread.currentThread().getId() + " run oven");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lock != null)
lock.unlock();
}
}
};
new Thread(task1).start();
Runnable task2 = new Runnable() {
public void run() {
ServiceLock lock = null;
try {
lock = lockFactory.getLock("test1");
if (!lock.tryLock(2000, TimeUnit.MILLISECONDS)){
System.out.println("Thread " + Thread.currentThread().getId() + " unget lock");
}
lock.lock();
System.out.println("Thread " + Thread.currentThread().getId() + " getted lock");
System.out.println("Thread " + Thread.currentThread().getId() + " running");
Thread.sleep(3000);
System.out.println("Thread " + Thread.currentThread().getId() + " run oven");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lock != null)
lock.unlock();
}
}
};
Runnable task3 = new Runnable() {
public void run() {
ServiceLock lock = null;
try {
lock = lockFactory.getLock("test1");
lock.lock();
System.out.println("Thread " + Thread.currentThread().getId() + " getted lock");
System.out.println("Thread " + Thread.currentThread().getId() + " running");
Thread.sleep(4000);
System.out.println("Thread " + Thread.currentThread().getId() + " run oven");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lock != null)
lock.unlock();
}
}
};
new Thread(task2).start();
new Thread(task3).start();
try {
Thread.sleep(20000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
/**
* Created by liuroy on 2017/9/25.
*/
public class ZookeeperServiceLockTest {
private ServiceLockFactory lockFactory;
@Before
public void init() {
lockFactory = new ZookeeperServiceLockFactory();
}
@Test
public void testServiceLock() {
Runnable task1 = new Runnable() {
public void run() {
ServiceLock lock = null;
try {
lock = lockFactory.getLock("testmmm/test");
lock.lock();
System.out.println("Thread1 getted lock");
Thread.sleep(3000);
System.out.println("Thread1 running");
System.out.println("Thread1 run oven");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lock != null)
lock.unlock();
}
}
};
new Thread(task1).start();
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
Runnable task2 = new Runnable() {
public void run() {
ServiceLock lock = null;
try {
lock = lockFactory.getLock("testmmm/test");
if (!lock.tryLock(2000, TimeUnit.MILLISECONDS)){
System.out.println("Thread2 unget lock");
}
lock.lock();
System.out.println("Thread2 getted lock");
System.out.println("Thread2 running");
Thread.sleep(3000);
System.out.println("Thread2 run oven");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lock != null)
lock.unlock();
}
}
};
Runnable task3 = new Runnable() {
public void run() {
ServiceLock lock = null;
try {
lock = lockFactory.getLock("testmmm/test");
lock.lock();
System.out.println("Thread3 getted lock");
System.out.println("Thread3 running");
Thread.sleep(4000);
System.out.println("Thread3 run oven");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lock != null)
lock.unlock();
}
}
};
new Thread(task3).start();
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(task2).start();
try {
Thread.sleep(20000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
@After
public void fini() {
}
}