11.源码分析---SOFARPC数据透传是实现的?
先把栗子放上,让大家方便测试用:
Service端
public static void main(String[] args) {
ServerConfig serverConfig = new ServerConfig()
.setProtocol("bolt") // 设置一个协议,默认bolt
.setPort(12200) // 设置一个端口,默认12200
.setDaemon(false); // 非守护线程
ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
.setInterfaceId(HelloService.class.getName()) // 指定接口
.setRef(new HelloServiceImpl()) // 指定实现
.setServer(serverConfig); // 指定服务端
providerConfig.export(); // 发布服务
}
public class HelloServiceImpl implements HelloService {
private final static Logger LOGGER = LoggerFactory.getLogger(HelloServiceImpl.class);
@Override
public String sayHello(String string) {
LOGGER.info("Server receive: " + string);
// 获取请求透传数据并打印
System.out.println("service receive reqBag -> " + RpcInvokeContext.getContext().getRequestBaggage("req_bag"));
// 设置响应透传数据到当前线程的上下文中
RpcInvokeContext.getContext().putResponseBaggage("req_bag", "s2c");
return "hello " + string + " !";
}
}
client端
public static void main(String[] args) {
ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
.setInterfaceId(HelloService.class.getName()) // 指定接口
.setProtocol("bolt") // 指定协议
.setDirectUrl("bolt://127.0.0.1:12200") // 指定直连地址
.setConnectTimeout(10 * 1000);
RpcInvokeContext.getContext().putRequestBaggage("req_bag", "a2bbb");
HelloService helloService = consumerConfig.refer();
while (true) {
System.out.println("service receive reqBag -> " + RpcInvokeContext.getContext().getResponseBaggage("req_bag"));
try {
LOGGER.info(helloService.sayHello("world"));
} catch (Exception e) {
e.printStackTrace();
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
通过上面的栗子我们可以看出整个流程应该是:
- 客户端把需要透传的数据放入到requestBaggage中,然后调用服务端
- 服务端在HelloServiceImpl中获取请求透传数据并打印,并把响应数据放入到responseBaggage中
- 客户端收到透传数据
所以下面我们从客户端开始源码讲解。
客户端数据透传给服务端
首先客户端在引用之前要设置putRequestBaggage
,然后在客户端引用的时候会调用ClientProxyInvoker#invoke方法。
如下:
ClientProxyInvoker#invoke
public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
....
// 包装请求
decorateRequest(request);
....
}
通过调用decorateRequest会调用到子类DefaultClientProxyInvoker的decorateRequest方法。
DefaultClientProxyInvoker#decorateRequest
protected void decorateRequest(SofaRequest request) {
....
RpcInvokeContext invokeCtx = RpcInvokeContext.peekContext();
RpcInternalContext internalContext = RpcInternalContext.getContext();
if (invokeCtx != null) {
....
// 如果用户指定了透传数据
if (RpcInvokeContext.isBaggageEnable()) {
// 需要透传
BaggageResolver.carryWithRequest(invokeCtx, request);
internalContext.setAttachment(HIDDEN_KEY_INVOKE_CONTEXT, invokeCtx);
}
}
....
}
在decorateRequest方法里首先会校验有没有开启透传数据,如果开启了,那么就调用BaggageResolver#carryWithRequest,把要透传的数据放入到request里面
BaggageResolver#carryWithRequest
public static void carryWithRequest(RpcInvokeContext context, SofaRequest request) {
if (context != null) {
//获取所有的透传数据
Map<String, String> requestBaggage = context.getAllRequestBaggage();
if (CommonUtils.isNotEmpty(requestBaggage)) { // 需要透传
request.addRequestProp(RemotingConstants.RPC_REQUEST_BAGGAGE, requestBaggage);
}
}
}
这个方法里面要做的就是获取所有的透传数据,然后放置到RequestProp里面,这样在发送请求的时候就会传送到服务端。
服务端接受透传数据
服务端的调用流程如下:
BoltServerProcessor->FilterChain->ProviderExceptionFilter->FilterInvoker->RpcServiceContextFilter->FilterInvoker->ProviderBaggageFilter->FilterInvoker->ProviderTracerFilter->ProviderInvoker
所以从上面的调用链可以知道,在服务端引用的时候会经过ProviderBaggageFilter过滤器,我们下面看看这个过滤器做了什么事情:
ProviderBaggageFilter#invoke
public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException {
SofaResponse response = null;
try {
//从request中获取透传数据存入到requestBaggage中
BaggageResolver.pickupFromRequest(RpcInvokeContext.peekContext(), request, true);
response = invoker.invoke(request);
} finally {
if (response != null) {
BaggageResolver.carryWithResponse(RpcInvokeContext.peekContext(), response);
}
}
return response;
}
ProviderBaggageFilter会调用BaggageResolver#pickupFromRequest
从request中获取数据
BaggageResolver#pickupFromRequest
public static void pickupFromRequest(RpcInvokeContext context, SofaRequest request, boolean init) {
if (context == null && !init) {
return;
}
// 解析请求
Map<String, String> requestBaggage = (Map<String, String>) request
.getRequestProp(RemotingConstants.RPC_REQUEST_BAGGAGE);
if (CommonUtils.isNotEmpty(requestBaggage)) {
if (context == null) {
context = RpcInvokeContext.getContext();
}
context.putAllRequestBaggage(requestBaggage);
}
}
最后会在ProviderBaggageFilter invoke方法的finally里面调用BaggageResolver#carryWithResponse
把响应透传数据回写到response里面。
public static void carryWithResponse(RpcInvokeContext context, SofaResponse response) {
if (context != null) {
Map<String, String> responseBaggage = context.getAllResponseBaggage();
if (CommonUtils.isNotEmpty(responseBaggage)) {
String prefix = RemotingConstants.RPC_RESPONSE_BAGGAGE + ".";
for (Map.Entry<String, String> entry : responseBaggage.entrySet()) {
response.addResponseProp(prefix + entry.getKey(), entry.getValue());
}
}
}
}
客户端收到响应透传数据
最后客户端会在ClientProxyInvoker#invoke方法里调用decorateResponse获取response回写的数据。
public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
....
// 包装响应
decorateResponse(response);
....
}
decorateResponse是在子类DefaultClientProxyInvoker实现的:
DefaultClientProxyInvoker#decorateResponse
protected void decorateResponse(SofaResponse response) {
....
//如果开启了透传
if (RpcInvokeContext.isBaggageEnable()) {
BaggageResolver.pickupFromResponse(invokeCtx, response, true);
}
....
}
这个方法里面会调用BaggageResolver#pickupFromResponse
public static void pickupFromResponse(RpcInvokeContext context, SofaResponse response, boolean init) {
if (context == null && !init) {
return;
}
Map<String, String> responseBaggage = response.getResponseProps();
if (CommonUtils.isNotEmpty(responseBaggage)) {
String prefix = RemotingConstants.RPC_RESPONSE_BAGGAGE + ".";
for (Map.Entry<String, String> entry : responseBaggage.entrySet()) {
if (entry.getKey().startsWith(prefix)) {
if (context == null) {
context = RpcInvokeContext.getContext();
}
//因为entry的key里面会包含rpc_resp_baggage,所以需要截取掉
context.putResponseBaggage(entry.getKey().substring(prefix.length()),
entry.getValue());
}
}
}
}
这个方法里面response获取所有的透传数据,然后放入到ResponseBaggage中。
到这里SOFARPC数据透传就分析完毕了