[从源码学设计]蚂蚁金服SOFARegistry之服务上线
[从源码学设计]蚂蚁金服SOFARegistry之服务上线
0x00 摘要
SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。
本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。
本文为第十三篇,介绍从SessionServer角度看的服务上线。
本文以介绍业务为主,顺便整理逻辑,设计和模式。因为注册过程牵扯模块太多,所以本文仅仅专注在注册过程中Session Server的部分。
0x01 业务领域
1.1 应用场景
服务的上下线过程是指服务通过代码调用执行常规注册(Publisher#register) 和下线(Publisher#unregister)操作,不考虑因为服务宕机等意外情况导致的下线场景。
1.1.1 服务发布
一个典型的 “RPC 调用的服务寻址” 应用场景,服务的提供方通过如下两个步骤完成服务发布:
- 注册,将自己以 Publisher 的角色注册到 SOFARegistry;
- 发布,将需要发布的数据 (通常是IP 地址、端口、调用方式等) 发布到 SOFARegistry;
与此相对应的,服务的调用方通过如下步骤实现服务调用:
- 注册,将自己以 Subscriber 的角色注册到 SOFARegistry;
- 订阅,收到 SOFARegistry 推送的服务数据;
1.1.2 SessionServer的必要性
在SOFARegistry中,所有 Client 在注册和订阅数据时,根据 dataInfoId 做一致性 Hash,计算出应该访问哪一台 DataServer,然后与该 DataServer 建立长连接。
由于每个 Client 通常都会注册和订阅比较多的 dataInfoId 数据,因此我们可以预见每个 Client 均会与好几台 DataServer 建立连接。这个架构存在的问题是:“每台 DataServer 承载的连接数会随 Client 数量的增长而增长,每台 Client 极端的情况下需要与每台 DataServer 都建连,因此通过 DataServer 的扩容并不能线性的分摊 Client 连接数”。
所以,为数据分片层(DataServer)专门设计一个连接代理层是非常重要的,所以 SOFARegistry 就有了 SessionServer 这一层。随着 Client 数量的增长,可以通过扩容 SessionServer 就解决了单机的连接数瓶颈问题。
1.2 问题点
因为SessionServer是一个中间层,所以看起来好像比较简单,表面上看,就是接受,转发。
但是实际上,在大型系统中,应该如何在逻辑上,物理上实现模块分割,解耦都是非常有必要的。
1.3 阿里方案
我们主要看看阿里方案的注册部分。
1.3.1 注册过程
服务的上下线过程,是指服务通过代码调用做正常的注册(publisher.register) 和 下线(publisher.unregister),不考虑因为服务宕机等意外情况导致的下线。如上图,大概呈现了“一次服务注册过程”的服务数据在内部流转过程。
- Client 调用 publisher.register 向 SessionServer 注册服务。
- SessionServer 收到服务数据 (PublisherRegister) 后,将其写入内存 (SessionServer 会存储 Client 的数据到内存,用于后续可以跟 DataServer 做定期检查),再根据 dataInfoId 的一致性 Hash 寻找对应的 DataServer,将 PublisherRegister 发给 DataServer。
- DataServer 接收到 PublisherRegister 数据,首先也是将数据写入内存 ,DataServer 会以 dataInfoId 的维度汇总所有 PublisherRegister。同时,DataServer 将该 dataInfoId 的变更事件通知给所有 SessionServer,变更事件的内容是 dataInfoId 和版本号信息 version。
- 同时,异步地,DataServer 以 dataInfoId 维度增量地同步数据给其他副本。因为 DataServer 在一致性 Hash 分片的基础上,对每个分片保存了多个副本(默认是3个副本)。
- SessionServer 接收到变更事件通知后,对比 SessionServer 内存中存储的 dataInfoId 的 version,若发现比 DataServer 发过来的小,则主动向 DataServer 获取 dataInfoId 的完整数据,即包含了所有该 dataInfoId 具体的 PublisherRegister 列表。
- 最后,SessionServer 将数据推送给相应的 Client,Client 就接收到这一次服务注册之后的最新的服务列表数据。
1.3.2 图示
下图展示了 Publisher 注册的代码流转过程
这个过程也是采用了 Handler – Task & Strategy – Listener 的方式来处理,任务在代码内部的处理流程和订阅过程基本一致。
0x02 Client SDK
PublisherRegistration 是Client的接口,发布数据的关键代码如下:
// 构造发布者注册表
PublisherRegistration registration = new PublisherRegistration("com.alipay.test.demo.service:1.0@DEFAULT");
registration.setGroup("TEST_GROUP");
registration.setAppName("TEST_APP");
// 将注册表注册进客户端并发布数据
Publisher publisher = registryClient.register(registration, "10.10.1.1:12200?xx=yy");
// 如需覆盖上次发布的数据可以使用发布者模型重新发布数据
publisher.republish("10.10.1.1:12200?xx=zz");
发布数据的关键是构造 PublisherRegistration,该类包含三个属性:
属性名 | 属性类型 | 描述 |
---|---|---|
dataId | String | 数据ID,发布订阅时需要使用相同值,数据唯一标识由 dataId + group + instanceId 组成。 |
group | String | 数据分组,发布订阅时需要使用相同值,数据唯一标识由 dataId + group + instanceId 组成,默认值 DEFAULT_GROUP。 |
appName | String | 应用 appName。 |
0x03 Session server
流程来到了Session server。
3.1 Bean
首先,可以通过Beans来入手。
@Bean(name = "serverHandlers")
public Collection<AbstractServerHandler> serverHandlers() {
Collection<AbstractServerHandler> list = new ArrayList<>();
list.add(publisherHandler());
list.add(subscriberHandler());
list.add(watcherHandler());
list.add(clientNodeConnectionHandler());
list.add(cancelAddressRequestHandler());
list.add(syncConfigHandler());
return list;
}
serverHandlers 是Bolt Server 的响应函数组合。
@Bean
@ConditionalOnMissingBean(name = "sessionRegistry")
public Registry sessionRegistry() {
return new SessionRegistry();
}
从Bean角度看,目前的逻辑是如图所示,这里有了一次解耦Strategy:
Beans
+-----------------------------------+
| Bolt Server(in openSessionServer) | +---------------------------------+
| | +-> | DefaultPublisherHandlerStrategy |
| +----------------------+ | | +---------+-----------------------+
| | serverHandlers | | | |
| | | | | |
| | +------------------+ | | | |
| | | PublisherHandle+----------------+ v
| | | | | | +-------+-------+
| | | watcherHandler | | | |SessionRegistry|
| | | | | | +---------------+
| | | ...... | | |
| | +------------------+ | |
| +----------------------+ |
+-----------------------------------+
服务发布者和Session Server一般都应该处于一个Data Center之中,这就是阿里等实践的单体概念.
3.2 入口
PublisherHandler 是 Session Server对Client的接口,是Bolt Server 的响应函数。
public class PublisherHandler extends AbstractServerHandler {
@Autowired
private ExecutorManager executorManager;
@Autowired
private PublisherHandlerStrategy publisherHandlerStrategy;
@Override
public Object reply(Channel channel, Object message) throws RemotingException {
RegisterResponse result = new RegisterResponse();
PublisherRegister publisherRegister = (PublisherRegister) message;
publisherHandlerStrategy.handlePublisherRegister(channel, publisherRegister, result);
return result;
}
逻辑如下图所示:
Publisher + Session Server Scope
Scope |
| +-----------------------------------+
| | Bolt Server(in openSessionServer) |
| | |
| | +----------------------+ |
+ | | serverHandlers | |
| | | |
+--------+ PublisherRegister | | +------------------+ | |
| Client +---------------------------> PublisherHandler | | |
+--------+ 1 | | | | | |
+ | | | ...... | | |
| | | +------------------+ | |
| | +----------------------+ |
| +-----------------------------------+
|
3.3 策略
整体上,这里是采用 Handler – Task & Strategy – Listener 的方式来处理。
什么是策略模式(Strategy Pattern)
在软件开发过程中常常遇到这样的情况,实现某一个功能有很多种算法或实现策略,我们可以根据环境或者条件的不同选择不同的算法或者策略来完成该功能。如果将这些算法或者策略抽象出来,提供一个统一的接口,不同的算法或者策略有不同的实现类,这样在程序客户端就可以通过注入不同的实现对象来实现算法或者策略的动态替换,这种模式的可扩展性和可维护性也更高,这就是策略模式。
策略模式的定义(Strategy Pattern)
-
策略模式: 定义了算法族,分别封装起来,让它们之间可以相互替换,此模式让算法的变化独立与使用算法的客户。
-
简单理解: 定义了一系列算法。每个算法封装起来。各个算法之间可以互相替换。且算法的变化不会影响到使用算法的客户。属于行为型模式。
在策略模式(Strategy Pattern)中,一个类的行为或其算法可以在运行时更改。这种类型的设计模式属于行为型模式。
在策略模式中,我们创建表示各种策略的对象和一个行为随着策略对象改变而改变的 context 对象。策略对象改变 context 对象的执行算法。
3.3.1 目录结构
从目录结构看,有很多Strategy的定义和实现,应该蚂蚁内部希望根据不同情况制定不同的策略,其中有些是目前留出的接口。
com/alipay/sofa/registry/server/session/strategy
.
├── DataChangeRequestHandlerStrategy.java
├── PublisherHandlerStrategy.java
├── ReceivedConfigDataPushTaskStrategy.java
├── ReceivedDataMultiPushTaskStrategy.java
├── SessionRegistryStrategy.java
├── SubscriberHandlerStrategy.java
├── SubscriberMultiFetchTaskStrategy.java
├── SubscriberRegisterFetchTaskStrategy.java
├── SyncConfigHandlerStrategy.java
├── TaskMergeProcessorStrategy.java
├── WatcherHandlerStrategy.java
└── impl
├── DefaultDataChangeRequestHandlerStrategy.java
├── DefaultPublisherHandlerStrategy.java
├── DefaultPushTaskMergeProcessor.java
├── DefaultReceivedConfigDataPushTaskStrategy.java
├── DefaultReceivedDataMultiPushTaskStrategy.java
├── DefaultSessionRegistryStrategy.java
├── DefaultSubscriberHandlerStrategy.java
├── DefaultSubscriberMultiFetchTaskStrategy.java
├── DefaultSubscriberRegisterFetchTaskStrategy.java
├── DefaultSyncConfigHandlerStrategy.java
└── DefaultWatcherHandlerStrategy.java
3.3.2 DefaultPublisherHandlerStrategy
从目前代码看,只是设置,分类,转发。即设置Publisher的缺省信息,并且根据 event type 不同执行register或者unRegister。
public class DefaultPublisherHandlerStrategy implements PublisherHandlerStrategy {
@Autowired
private Registry sessionRegistry;
@Override
public void handlePublisherRegister(Channel channel, PublisherRegister publisherRegister, RegisterResponse registerResponse) {
try {
String ip = channel.getRemoteAddress().getAddress().getHostAddress();
int port = channel.getRemoteAddress().getPort();
publisherRegister.setIp(ip);
publisherRegister.setPort(port);
if (StringUtils.isBlank(publisherRegister.getZone())) {
publisherRegister.setZone(ValueConstants.DEFAULT_ZONE);
}
if (StringUtils.isBlank(publisherRegister.getInstanceId())) {
publisherRegister.setInstanceId(DEFAULT_INSTANCE_ID);
}
Publisher publisher = PublisherConverter.convert(publisherRegister);
publisher.setProcessId(ip + ":" + port);
publisher.setSourceAddress(new URL(channel.getRemoteAddress()));
if (EventTypeConstants.REGISTER.equals(publisherRegister.getEventType())) {
sessionRegistry.register(publisher);
} else if (EventTypeConstants.UNREGISTER.equals(publisherRegister.getEventType())) {
sessionRegistry.unRegister(publisher);
}
registerResponse.setSuccess(true);
registerResponse.setVersion(publisher.getVersion());
registerResponse.setRegistId(publisherRegister.getRegistId());
registerResponse.setMessage("Publisher register success!");
}
}
}
逻辑如下图所示
Publisher + Session Server Scope
Scope |
| +-----------------------------------+
| | Bolt Server(in openSessionServer) |
| | |
| | +----------------------+ |
+ | | serverHandlers | | +-------------------------------+
| | | | |DefaultPublisherHandlerStrategy|
+--------+ PublisherRegister | | +------------------+ | | handlePublisherRegister | |
| Client +---------------------------> PublisherHandler+------------------------------------>+ EventType == REGISTER |
+--------+ 1 | | | | | | | |
+ | | | watcherHandler | | | +-------------------------------+
| | | | | | |
| | | | ...... | | |
| | | +------------------+ | |
| | +----------------------+ |
+-----------------------------------+
手机如图
3.4 核心逻辑组件
前面代码中,策略会调用到 sessionRegistry.register(publisher),即注册功能。
从SessionRegistry的内部成员变量就能够看出来,这是 Session Server 核心逻辑所在。
主要提供了如下功能:
-
register(StoreData data) :注册新publisher或者subscriber data
-
cancel(List connectIds) :取消publisher或者subscriber data
-
remove(List connectIds) :移除publisher或者subscriber data
-
unRegister(StoreData data) :注销publisher或者subscriber data
-
…..
具体成员变量如下:
public class SessionRegistry implements Registry {
/**
* store subscribers
*/
@Autowired
private Interests sessionInterests;
/**
* store watchers
*/
@Autowired
private Watchers sessionWatchers;
/**
* store publishers
*/
@Autowired
private DataStore sessionDataStore;
/**
* transfer data to DataNode
*/
@Autowired
private DataNodeService dataNodeService;
/**
* trigger task com.alipay.sofa.registry.server.meta.listener process
*/
@Autowired
private TaskListenerManager taskListenerManager;
/**
* calculate data node url
*/
@Autowired
private NodeManager dataNodeManager;
@Autowired
private SessionServerConfig sessionServerConfig;
@Autowired
private Exchange boltExchange;
@Autowired
private SessionRegistryStrategy sessionRegistryStrategy;
@Autowired
private WrapperInterceptorManager wrapperInterceptorManager;
@Autowired
private DataIdMatchStrategy dataIdMatchStrategy;
@Autowired
private RenewService renewService;
@Autowired
private WriteDataAcceptor writeDataAcceptor;
private volatile boolean enableDataRenewSnapshot = true;
}
register函数生成一个WriteDataRequest,然后调用了 writeDataAcceptor.accept 完成处理。
@Override
public void register(StoreData storeData) {
WrapperInvocation<StoreData, Boolean> wrapperInvocation = new WrapperInvocation(
new Wrapper<StoreData, Boolean>() {
@Override
public Boolean call() {
switch (storeData.getDataType()) {
case PUBLISHER:
Publisher publisher = (Publisher) storeData;
sessionDataStore.add(publisher);
// All write operations to DataServer (pub/unPub/clientoff/renew/snapshot)
// are handed over to WriteDataAcceptor
writeDataAcceptor.accept(new WriteDataRequest() {
@Override
public Object getRequestBody() {
return publisher;
}
@Override
public WriteDataRequestType getRequestType() {
return WriteDataRequestType.PUBLISHER;
}
@Override
public String getConnectId() {
return publisher.getSourceAddress().getAddressString();
}
@Override
public String getDataServerIP() {
Node dataNode = dataNodeManager.getNode(publisher.getDataInfoId());
return dataNode.getNodeUrl().getIpAddress();
}
});
sessionRegistryStrategy.afterPublisherRegister(publisher);
break;
case SUBSCRIBER:
Subscriber subscriber = (Subscriber) storeData;
sessionInterests.add(subscriber);
sessionRegistryStrategy.afterSubscriberRegister(subscriber);
break;
case WATCHER:
Watcher watcher = (Watcher) storeData;
sessionWatchers.add(watcher);
sessionRegistryStrategy.afterWatcherRegister(watcher);
break;
default:
break;
}
return null;
}
@Override
public Supplier<StoreData> getParameterSupplier() {
return () -> storeData;
}
}, wrapperInterceptorManager);
try {
wrapperInvocation.proceed();
} catch (Exception e) {
throw new RuntimeException("Proceed register error!", e);
}
}
目前逻辑如下图所示:
Publisher + Session Server Scope
Scope |
| +-----------------------------------+
| | Bolt Server(in openSessionServer) |
| | |
| | +----------------------+ |
+ | | serverHandlers | | +-------------------------------+
| | | | |DefaultPublisherHandlerStrategy|
+--------+ PublisherRegister | | +------------------+ | | handlePublisherRegister | |
| Client +---------------------------> PublisherHandler+------------------------------------>+ EventType == REGISTER |
+--------+ 1 | | | | | | 2 | |
+ | | | watcherHandler | | | +------------+------------------+
| | | | | | | |
| | | | ...... | | | |
| | | +------------------+ | | 3 | register
| | +----------------------+ | |
+-----------------------------------+ |
v
+-------------------+-------------------+
| SessionRegistry |
| |
| |
| storeData.getDataType() == PUBLISHER |
+---------------------------------------+
手机如下:
3.4.1 SessionRegistryStrategy
这里又出现一个策略,目前也只有一个实现,应该也是想要未来做成替换,目前功能只是简单的留下了接口为空。
我们可以看出阿里处处想解耦的思路。
public class DefaultSessionRegistryStrategy implements SessionRegistryStrategy {
@Override
public void afterPublisherRegister(Publisher publisher) {
}
}
3.4.2 存储模块
前文在注册过程中有:
sessionDataStore.add(publisher);
这里就是Session的 数据存储模块,也是系统的核心。
public class SessionDataStore implements DataStore {
/**
* publisher store
*/
private Map<String/*dataInfoId*/, Map<String/*registerId*/, Publisher>> registry = new ConcurrentHashMap<>();
/*** index */
private Map<String/*connectId*/, Map<String/*registerId*/, Publisher>> connectIndex = new ConcurrentHashMap<>();
}
这里记录了两种存储方式,分别是按照 dataInfoId 和 connectId 来存储。
存储时候,会从版本号和时间戳两个维度来比较。
@Override
public void add(Publisher publisher) {
Publisher.internPublisher(publisher);
write.lock();
try {
Map<String, Publisher> publishers = registry.get(publisher.getDataInfoId());
if (publishers == null) {
ConcurrentHashMap<String, Publisher> newmap = new ConcurrentHashMap<>();
publishers = registry.putIfAbsent(publisher.getDataInfoId(), newmap);
if (publishers == null) {
publishers = newmap;
}
}
Publisher existingPublisher = publishers.get(publisher.getRegisterId());
if (existingPublisher != null) {
if (existingPublisher.getVersion() != null) {
long oldVersion = existingPublisher.getVersion();
Long newVersion = publisher.getVersion();
if (newVersion == null) {
return;
} else if (oldVersion > newVersion) {
return;
} else if (oldVersion == newVersion) {
Long newTime = publisher.getRegisterTimestamp();
long oldTime = existingPublisher.getRegisterTimestamp();
if (newTime == null) {
return;
}
if (oldTime > newTime) {
return;
}
}
}
}
publishers.put(publisher.getRegisterId(), publisher);
addToConnectIndex(publisher);
} finally {
write.unlock();
}
}
3.5 Acceptor模块
在SessionServer本身存储完成之后,接下来就是通知Data Server了。
3.5.1 总体Acceptor
WriteDataAcceptorImpl 负责处理具体Publisher的写入。首先需要把写入请求统一起来。
使用 private Map<String, WriteDataProcessor> writeDataProcessors = new ConcurrentHashMap()
; 来统一存储所有的写入请求。
这里根据不同的Connection来处理不同连接的写入请求。
具体如下:
public class WriteDataAcceptorImpl implements WriteDataAcceptor {
@Autowired
private TaskListenerManager taskListenerManager;
@Autowired
private SessionServerConfig sessionServerConfig;
@Autowired
private RenewService renewService;
/**
* acceptor for all write data request
* key:connectId
* value:writeRequest processor
*
*/
private Map<String, WriteDataProcessor> writeDataProcessors = new ConcurrentHashMap();
public void accept(WriteDataRequest request) {
String connectId = request.getConnectId();
WriteDataProcessor writeDataProcessor = writeDataProcessors.computeIfAbsent(connectId,
key -> new WriteDataProcessor(connectId, taskListenerManager, sessionServerConfig, renewService));
writeDataProcessor.process(request);
}
public void remove(String connectId) {
writeDataProcessors.remove(connectId);
}
}
目前逻辑如下图所示
Publisher + Session Server Scope
Scope |
| +-----------------------------------+
| | Bolt Server(in openSessionServer) |
| | |
| | +----------------------+ |
+ | | serverHandlers | | +-------------------------------+
| | | | |DefaultPublisherHandlerStrategy|
+--------+ PublisherRegister | | +------------------+ | | handlePublisherRegister | |
| Client +---------------------------> PublisherHandler+------------------------------------>+ EventType == REGISTER |
+--------+ 1 | | | | | | 2 | |
+ | | | watcherHandler | | | +------------+------------------+
| | | | | | | |
| | | | ...... | | | |
| | | +------------------+ | | register | 3
| | +----------------------+ | |
| +-----------------------------------+ |
| v
| +-----------------------------------------------------+ +-------------+-------------------------+
| | WriteDataAcceptorImpl | WriteDataRequest | SessionRegistry |
| | | <------------------+ |
| | | | |
| | Map<String, WriteDataProcessor> writeDataProcessors | | storeData.getDataType() == PUBLISHER |
| | | +---------------------------------------+
+ +-----------------------------------------------------+
手机如图
3.5.2 具体处理
前面已经把所有请求统一起来,现在就需要针对每一个连接的写入继续处理。
这里关键是如下数据结构,就是每一个连接的写入请求 放到了queue中。
ConcurrentLinkedQueue<WriteDataRequest> acceptorQueue
针对每个请求不同,做不同处理。
对于我们的例子,处理如下:
case PUBLISHER: {
doPublishAsync(request);
}
而最终是向taskListenerManager发送给请求TaskType.PUBLISH_DATA_TASK,该请求将被PublishDataTaskListener调用publishDataTask来处理。
这里有一个listener解耦,我们接下来讲解。
private void doPublishAsync(WriteDataRequest request) {
sendEvent(request.getRequestBody(), TaskType.PUBLISH_DATA_TASK);
}
private void sendEvent(Object eventObj, TaskType taskType) {
TaskEvent taskEvent = new TaskEvent(eventObj, taskType);
taskListenerManager.sendTaskEvent(taskEvent);
}
具体代码如下:
public class WriteDataProcessor {
private final TaskListenerManager taskListenerManager;
private final SessionServerConfig sessionServerConfig;
private final RenewService renewService;
private final String connectId;
private Map<String, AtomicLong> lastUpdateTimestampMap = new ConcurrentHashMap<>();
private AtomicBoolean writeDataLock = new AtomicBoolean(
false);
private ConcurrentLinkedQueue<WriteDataRequest> acceptorQueue = new ConcurrentLinkedQueue();
private AtomicInteger acceptorQueueSize = new AtomicInteger(0);
public void process(WriteDataRequest request) {
// record the last update time by pub/unpub
if (isWriteRequest(request)) {
refreshUpdateTime(request.getDataServerIP());
}
if (request.getRequestType() == WriteDataRequestType.DATUM_SNAPSHOT) {
// snapshot has high priority, so handle directly
doHandle(request);
} else {
// If locked, insert the queue;
// otherwise, try emptying the queue (to avoid residue) before processing the request.
if (writeDataLock.get()) {
addQueue(request);
} else {
flushQueue();
doHandle(request);
}
}
}
private void doHandle(WriteDataRequest request) {
switch (request.getRequestType()) {
case PUBLISHER: {
doPublishAsync(request);
}
break;
case UN_PUBLISHER: {
doUnPublishAsync(request);
}
break;
case CLIENT_OFF: {
doClientOffAsync(request);
}
break;
case RENEW_DATUM: {
if (renewAndSnapshotInSilence(request.getDataServerIP())) {
return;
}
doRenewAsync(request);
}
break;
case DATUM_SNAPSHOT: {
if (renewAndSnapshotInSilenceAndRefreshUpdateTime(request.getDataServerIP())) {
return;
}
halt();
try {
doSnapshotAsync(request);
} finally {
resume();
}
}
break;
}
private void doPublishAsync(WriteDataRequest request) {
sendEvent(request.getRequestBody(), TaskType.PUBLISH_DATA_TASK);
}
private void sendEvent(Object eventObj, TaskType taskType) {
TaskEvent taskEvent = new TaskEvent(eventObj, taskType);
taskListenerManager.sendTaskEvent(taskEvent);
}
}
如下图所示
Publisher + Session Server Scope
Scope |
| +-----------------------------------+
| | Bolt Server(in openSessionServer) |
| | |
| | +----------------------+ |
+ | | serverHandlers | | +-------------------------------+
| | | | |DefaultPublisherHandlerStrategy|
+--------+ PublisherRegister | | +------------------+ | | handlePublisherRegister | |
| Client +---------------------------> PublisherHandler+------------------------------------>+ EventType == REGISTER |
+--------+ 1 | | | | | | 2 | |
+ | | | watcherHandler | | | +------------+------------------+
| | | | | | | |
| | | | ...... | | | |
| | | +------------------+ | | register | 3
| | +----------------------+ | |
| +-----------------------------------+ |
| v
| +---------------------------------------------------------+ +---------+-----------------------------+
| | WriteDataAcceptorImpl | WriteDataRequest | SessionRegistry |
| | | <------------------+ |
| | | 4 | sessionDataStore.add(publisher) |
| | Map<connectId , WriteDataProcessor> writeDataProcessors | | |
| | | | storeData.getDataType() == PUBLISHER |
| +----------------------+----------------------------------+ | |
| process | 5 +---------------------------------------+
| v
| +-------------------+---------------------+ +--------------------------+
| | WriteDataProcessor | | PublishDataTaskListener |
| | | PUBLISH_DATA_TASK | |
| | ConcurrentLinkedQueue<WriteDataRequest> +-------------------> | PublishDataTask |
| | | 6 +--------------------------+
+ +-----------------------------------------+
手机如图 :
3.6 Listener 解耦
前面在逻辑上都是一体化的,在这里,进行了一次解耦。
3.6.1 解耦引擎
DefaultTaskListenerManager 是解耦的机制,可以看到,其中添加了listener,当用户调用sendTaskEvent时候,将遍历所有的listeners,调用对应的listener。
public class DefaultTaskListenerManager implements TaskListenerManager {
private Multimap<TaskType, TaskListener> taskListeners = ArrayListMultimap.create();
@Override
public Multimap<TaskType, TaskListener> getTaskListeners() {
return taskListeners;
}
@Override
public void addTaskListener(TaskListener taskListener) {
taskListeners.put(taskListener.support(), taskListener);
}
@Override
public void sendTaskEvent(TaskEvent taskEvent) {
Collection<TaskListener> taskListeners = this.taskListeners.get(taskEvent.getTaskType());
for (TaskListener taskListener : taskListeners) {
taskListener.handleEvent(taskEvent);
}
}
}
3.6.2 Listener
PublishDataTaskListener是对应的处理函数,在其support函数中,声明了支持PUBLISH_DATA_TASK。这样就完成了解耦。
public class PublishDataTaskListener implements TaskListener {
@Autowired
private DataNodeService dataNodeService;
@Autowired
private TaskProcessor dataNodeSingleTaskProcessor;
@Autowired
private ExecutorManager executorManager;
@Override
public TaskType support() {
return TaskType.PUBLISH_DATA_TASK;
}
@Override
public void handleEvent(TaskEvent event) {
SessionTask publishDataTask = new PublishDataTask(dataNodeService);
publishDataTask.setTaskEvent(event);
executorManager.getPublishDataExecutor().execute(()-> dataNodeSingleTaskProcessor.process(publishDataTask));
}
}
3.7 Task调度
上面找到了Listener,Listener中通过如下代码启动了执行业务的task来处理。但是这背后的机制需要探究。
executorManager.getPublishDataExecutor().execute(()-> dataNodeSingleTaskProcessor.process(publishDataTask));
3.7.1 ExecutorManager
ExecutorManager 之中,对于线程池做了统一的启动,关闭。publishDataExecutor就是其中之一。
ExecutorManager相关代码摘取如下:
public class ExecutorManager {
private final ScheduledThreadPoolExecutor scheduler;
private final ThreadPoolExecutor publishDataExecutor;
private static final String PUBLISH_DATA_EXECUTOR = "PublishDataExecutor";
public ExecutorManager(SessionServerConfig sessionServerConfig) {
publishDataExecutor = reportExecutors.computeIfAbsent(PUBLISH_DATA_EXECUTOR,
k -> new SessionThreadPoolExecutor(PUBLISH_DATA_EXECUTOR,
sessionServerConfig.getPublishDataExecutorMinPoolSize(),
sessionServerConfig.getPublishDataExecutorMaxPoolSize(),
sessionServerConfig.getPublishDataExecutorKeepAliveTime(), TimeUnit.SECONDS,
new ArrayBlockingQueue<>(sessionServerConfig.getPublishDataExecutorQueueSize()),
new NamedThreadFactory("PublishData-executor", true)));
}
public ThreadPoolExecutor getPublishDataExecutor() {
return publishDataExecutor;
}
}
其中ExecutorManager的bean如下:
@Bean
public ExecutorManager executorManager(SessionServerConfig sessionServerConfig) {
return new ExecutorManager(sessionServerConfig);
}
3.7.2 Processor
Processor是任务定义,内部封装了task。
public class DataNodeSingleTaskProcessor implements TaskProcessor<SessionTask> {
@Override
public ProcessingResult process(SessionTask task) {
try {
task.execute();
return ProcessingResult.Success;
} catch (Throwable throwable) {
if (task instanceof Retryable) {
Retryable retryAbleTask = (Retryable) task;
if (retryAbleTask.checkRetryTimes()) {
return ProcessingResult.TransientError;
}
}
return ProcessingResult.PermanentError;
}
}
@Override
public ProcessingResult process(List<SessionTask> tasks) {
return null;
}
}
3.7.3 业务Task
PublishDataTask的execute 之中 ,调用dataNodeService.register(publisher)进行注册。
public class PublishDataTask extends AbstractSessionTask {
private final DataNodeService dataNodeService;
private Publisher publisher;
public PublishDataTask(DataNodeService dataNodeService) {
this.dataNodeService = dataNodeService;
}
@Override
public void execute() {
dataNodeService.register(publisher);
}
@Override
public void setTaskEvent(TaskEvent taskEvent) {
//taskId create from event
if (taskEvent.getTaskId() != null) {
setTaskId(taskEvent.getTaskId());
}
Object obj = taskEvent.getEventObj();
if (obj instanceof Publisher) {
this.publisher = (Publisher) obj;
}
}
}
具体如下
+-------------------------------------------------+
| DefaultTaskListenerManager |
| |
| |
| Multimap<TaskType, TaskListener> taskListeners |
| |
+----------------------+--------------------------+
|
|
PUBLISH_DATA_TASK |
|
v
+------------+--------------+
| PublishDataTaskListener |
+------------+--------------+
|
setTaskEvent |
|
v
+--------+--------+
| PublishDataTask |
+-----------------+
3.8 转发服务信息
经过listener解耦之后,PublishDataTask就调用了dataNodeService.register(publisher)
,于是接下来就是转发服务信息给Data Server。
此处就是调用DataNodeServiceImpl的register函数来把请求转发给Data Server。
public class DataNodeServiceImpl implements DataNodeService {
@Autowired
private NodeExchanger dataNodeExchanger;
@Autowired
private NodeManager dataNodeManager;
@Autowired
private SessionServerConfig sessionServerConfig;
private AsyncHashedWheelTimer asyncHashedWheelTimer;
}
可以看到,建立了PublishDataRequest,然后通过Bolt Client,发送给Data Server。
@Override
public void register(final Publisher publisher) {
String bizName = "PublishData";
Request<PublishDataRequest> request = buildPublishDataRequest(publisher);
try {
sendRequest(bizName, request);
} catch (RequestException e) {
doRetryAsync(bizName, request, e, sessionServerConfig.getPublishDataTaskRetryTimes(),
sessionServerConfig.getPublishDataTaskRetryFirstDelay(),
sessionServerConfig.getPublishDataTaskRetryIncrementDelay());
}
}
private CommonResponse sendRequest(String bizName, Request request) throws RequestException {
Response response = dataNodeExchanger.request(request);
Object result = response.getResult();
CommonResponse commonResponse = (CommonResponse) result;
return commonResponse;
}
如下:
+-------------------------------------------------+
| DefaultTaskListenerManager |
| |
| |
| Multimap<TaskType, TaskListener> taskListeners |
| |
+----------------------+--------------------------+
|
PUBLISH_DATA_TASK |
v
+------------+--------------+
| PublishDataTaskListener |
+------------+--------------+
|
setTaskEvent |
v
+--------+--------+
| PublishDataTask |
+--------+--------+
register |
|
+----------v----------+
| DataNodeServiceImpl |
+----------+----------+
PublishDataRequest |
v
+----------+----------+ Client.sendSync +------------+
| DataNodeExchanger +------------------> | Data Server|
+---------------------+ PublishDataRequest +------------+
如何知道发给哪一个Data Sever?DataNodeExchanger 中有:
@Override
public Response request(Request request) throws RequestException {
Response response;
URL url = request.getRequestUrl();
try {
Client sessionClient = getClient(url);
final Object result = sessionClient
.sendSync(url, request.getRequestBody(), request.getTimeout() != null ? request.getTimeout() : sessionServerConfig.getDataNodeExchangeTimeOut());
response = () -> result;
}
return response;
}
于是去DataNodeServiceImpl寻找
private Request<PublishDataRequest> buildPublishDataRequest(Publisher publisher) {
return new Request<PublishDataRequest>() {
private AtomicInteger retryTimes = new AtomicInteger();
@Override
public PublishDataRequest getRequestBody() {
PublishDataRequest publishDataRequest = new PublishDataRequest();
publishDataRequest.setPublisher(publisher);
publishDataRequest.setSessionServerProcessId(SessionProcessIdGenerator
.getSessionProcessId());
return publishDataRequest;
}
@Override
public URL getRequestUrl() {
return getUrl(publisher.getDataInfoId());
}
@Override
public AtomicInteger getRetryTimes() {
return retryTimes;
}
};
}
private URL getUrl(String dataInfoId) {
Node dataNode = dataNodeManager.getNode(dataInfoId);
//meta push data node has not port
String dataIp = dataNode.getNodeUrl().getIpAddress();
return new URL(dataIp, sessionServerConfig.getDataServerPort());
}
在 DataNodeManager中有:
@Override
public DataNode getNode(String dataInfoId) {
DataNode dataNode = consistentHash.getNodeFor(dataInfoId);
return dataNode;
}
可见是通过dataInfoId计算出hash,然后 从DataNodeManager之中获取对应的DataNode,得到其url。
于是,上图拓展为:
+-------------------------------------------------+
| DefaultTaskListenerManager |
| |
| Multimap<TaskType, TaskListener> taskListeners |
| |
+----------------------+--------------------------+
|
PUBLISH_DATA_TASK | 1
v
+------------+--------------+
| PublishDataTaskListener |
+------------+--------------+
|
setTaskEvent | 2
v
+--------+--------+ 4 +---------------+
| PublishDataTask | +------> |DataNodeManager|
+--------+--------+ | +---------------+
register | 3 | consistentHash|
| | | 5
+----------v----------+---+ v
| DataNodeServiceImpl | 6 +-----+----+
+----------+----------+ <------------+ DataNode |
PublishDataRequest | 7 url +----------+
v
+----------+----------+
| DataNodeExchanger |
+----------+----------+
|
Client.sendSync | PublishDataRequest
|
v 8
+-----+------+
| Data Server|
+------------+
0xFF 参考
蚂蚁金服服务注册中心如何实现 DataServer 平滑扩缩容
蚂蚁金服服务注册中心 SOFARegistry 解析 | 服务发现优化之路
服务注册中心 Session 存储策略 | SOFARegistry 解析
海量数据下的注册中心 – SOFARegistry 架构介绍
服务注册中心数据分片和同步方案详解 | SOFARegistry 解析
蚂蚁金服开源通信框架SOFABolt解析之超时控制机制及心跳机制
蚂蚁金服服务注册中心数据一致性方案分析 | SOFARegistry 解析