dubbo通过netty将请求发送到provider的时候,provider之前已经启动好的NettyServer监听指定端口的时候会收到来自consumer的请求,将通过网络发送来的二进制编码成Request交给上层处理。dubbo从Request中取出调用信息,找到之前的Invoker,然后经过filter,最后通过代理调用到提供服务的方法。

provider处理请求的调用堆栈如下:

  1. sayHe110:18, TestDubb0Servicelmpl (com.test.service.impl)
  2. invokeMethod:-1, Wrapper1 (com. alibabadubbo. common.bytecode)
  3. dolnvoke:46, JavassistProxyFactory$1 (com.alibaba.dubbo.rpc.proxy.javassist)
  4. invoke:72, AbstractProxylnvoker (com.alibaba.dubbo.rpc.proxy)
  5. invoke:53, InvokerWrapper (com.alibaba.dubbo.rpc.protocol)
  6. invoke:64, ExceptionFilter .com alibaba.dubbo.rpc filter)
  7. invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol)
  8. invoke:64, MonitorFilter .com alibaba.dubbo. monitor.support)
  9. invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol)
  10. invoke:42, TimeoutFilter .com alibaba.dubbo. rpc.filter)
  11. invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol)
  12. invoke:49, TokenFilter .com alibaba.dubbo. roc. filter)
  13. invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol)
  14. invoke:78, TraceFilter .com alibaba dubbo. roc. protocol.dubbo.filter)
  15. invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol)
  16. invoke:60, ContextFilter .com alibaba.dubbo. roc. filter)
  17. invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol)
  18. invoke:132, GenericFilter .com alibaba.dubbo. roc. filter)
  19. invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol)
  20. invoke:38, ClassLoaderFilter .com alibaba dubbo.rpc.filter)
  21. invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol)
  22. invoke:38, EchoFilter .com alibaba dubbo. rpc filter)
  23. invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol)
  24. reply:108, DubboProtocol$1 .com alibaba dubbo.rpcprotocol.dubbo)
  25. handleRequest:86, HeaderExchangeHandler (com.alibaba.dubbo.remoting.exchange.support.header)
  26. received:172, HeaderExchangeHandler (com.alibaba dubbo. remoting. exchange.support.header)
  27. received:52, DecodeHandler (com.alibaba dubbo.remoting. transport)
  28. run:82, ChannelEventRunnable (com.alibaba.dubbo.remoting.transport.dispatcher)
  29. runWorker:1142, ThreadPoolExecutor (java.util.concurrent)
  30. run:617, ThreadPoolExecutor$Worker (java.util.concurrent)
  31. run:745, Thread (java.lang)

从调用堆栈基本可以看出provider整个处理请求的过程,比较简单,但是需要知道为什么调用过程是这样的?其中关键类是什么时候在初始化的?怎么初始化的?

接下来解决一下问题:

  1. 为什么是从ChannelEventRunnable开始的?谁初始化的ChannelEventRunnable?ChannelEventRunnable作用是什么?
  2. 为什么会调用到上面堆栈中的几个handler(也就是handler是怎么初始化的)?
  3. filter链怎么初始化的?

本来这些问题在export的时候如果仔细查看源码已经可以解决了,但是真正用到的时候是处理请求的时候,所以这里算是补上之前export过程的一些关键步骤。

上面的调用堆栈中,是在线程池中一个单独的线程来处理请求,所以先从线程池中调用的线程开始,ChannelEventRunnable的构造过程。

接着前面provider export的时候会启动NettyServer,所以ChannelEventRunnable的创建也从NettyServer的启动说起,ChannelEventRunnable被初始化的过程会涉及到netty的部分内容:

  1. NettyServer#doOpen,NettyServer启动的时候会创建NioServerSocketChannelFactory,该factory负责创建netty放入所有channel
  2. 在NioServerSocketChannelFactory构造方法中会初始化NioWorkerPool,在该类的构造方法中创建NioWorker
  3. 在创建NioWorker的过程中,调用超类AbstractNioSelector的构造方法
  1. // NioWorker构造方法中会调用超类AbstractNioSelector的构造方法
  2. AbstractNioSelector(Executor executor, ThreadNameDeterminer determiner) {
  3. this.executor = executor;
  4. openSelector(determiner);
  5. }
  6. // org.jboss.netty.channel.socket.nio.AbstractNioSelector#openSelector
  7. private void openSelector(ThreadNameDeterminer determiner) {
  8. try {
  9. // open selector
  10. selector = SelectorUtil.open();
  11. } catch (Throwable t) {
  12. throw new ChannelException("Failed to create a selector.", t);
  13. }
  14. // Start the worker thread with the new Selector.
  15. boolean success = false;
  16. try {
  17. // new一个thread,将当前初始化的NioWorker作为入参,也就是说最终要运行的是NioWorker.run
  18. // 这个start方法里面会将新建的这个线程放到线程池中运行
  19. // 这里的executor就是new NioServerSocketChannelFactory时候的入参worker,也就是worker线程池
  20. DeadLockProofWorker.start(executor, newThreadRenamingRunnable(id, determiner));
  21. success = true;
  22. } finally {
  23. // 省略中间代码...
  24. }
  25. assert selector != null && selector.isOpen();
  26. }
  27. // org.jboss.netty.channel.socket.nio.AbstractNioWorker#newThreadRenamingRunnable
  28. @Override
  29. protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) {
  30. // 这里的this就是初始化的NioWorker
  31. return new ThreadRenamingRunnable(this, "New I/O worker #" + id, determiner);
  32. }
  33. // org.jboss.netty.channel.socket.nio.NioWorker#run
  34. @Override
  35. public void run() {
  36. // 上面DeadLockProofWorker.start里面启动的线程会调用这个run方法
  37. // 这里调用了超类的run方法,最终会调用到org.jboss.netty.channel.socket.nio.AbstractNioSelector#run
  38. // AbstractNioSelector#run
  39. super.run();
  40. recvBufferPool.releaseExternalResources();
  41. }
  42. // AbstractNioSelector#run
  43. // 这个方法是NioWorker真正处理逻辑的地方,死循环调用select接受IO事件,然后处理
  44. public void run() {
  45. thread = Thread.currentThread();
  46. int selectReturnsImmediately = 0;
  47. Selector selector = this.selector;
  48. if (selector == null) {
  49. return;
  50. }
  51. // use 80% of the timeout for measure
  52. final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
  53. boolean wakenupFromLoop = false;
  54. for (;;) {
  55. wakenUp.set(false);
  56. try {
  57. long beforeSelect = System.nanoTime();
  58. // 监听I/O事件发生
  59. int selected = select(selector);
  60. // 省略中间代码...
  61. if (shutdown) {
  62. // 省略中间代码...
  63. } else {
  64. // 处理I/O事件
  65. process(selector);
  66. }
  67. } catch (Throwable t) {
  68. // 省略中间代码...
  69. }
  70. }
  71. }

接下来到初始化ChannelEventRunnable的调用堆栈

init_ChannelEventRunnable终于到了ChannelEventRunnable开始初始化的地方,所有的ChannelEventRunnable都是在AllChannelHandler中完成初始化,并加入到线程池中执行,下面以收到connect事件为例

  1. public void connected(Channel channel) throws RemotingException {
  2. ExecutorService cexecutor = getExecutorService();
  3. try{
  4. // 初始化ChannelEventRunnable并将其加入线程池
  5. // 这里的线程池是com.alibaba.dubbo.common.threadpool.ThreadPool这个扩展,默认配置的是"fixed",也就是FixedThreadPool
  6. cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));
  7. }catch (Throwable t) {
  8. throw new ExecutionException("connect event", channel, getClass()+" error when process connected event ." , t);
  9. }
  10. }

上面最终启动了ChannelEventRunnable线程,在这个线程中会最终调用到我们的SayHello方法中,这个类负责分类处理各种接收到的I/O事件

  1. // com.alibaba.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run
  2. public void run() {
  3. switch (state) {
  4. case CONNECTED:
  5. try{
  6. // 接收到连接
  7. handler.connected(channel);
  8. }catch (Exception e) {
  9. logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
  10. }
  11. break;
  12. case DISCONNECTED:
  13. try{
  14. // 连接断开
  15. handler.disconnected(channel);
  16. }catch (Exception e) {
  17. logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
  18. }
  19. break;
  20. case SENT:
  21. try{
  22. // 发送数据
  23. handler.sent(channel,message);
  24. }catch (Exception e) {
  25. logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
  26. + ", message is "+ message,e);
  27. }
  28. break;
  29. case RECEIVED:
  30. try{
  31. // 收到数据
  32. handler.received(channel, message);
  33. }catch (Exception e) {
  34. logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
  35. + ", message is "+ message,e);
  36. }
  37. break;
  38. case CAUGHT:
  39. try{
  40. // 处理异常
  41. handler.caught(channel, exception);
  42. }catch (Exception e) {
  43. logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is "+ channel
  44. + ", message is: " + message + ", exception is " + exception,e);
  45. }
  46. break;
  47. default:
  48. logger.warn("unknown state: " + state + ", message is " + message);
  49. }
  50. }

上面通过调用handler的相关方法来处理的,接下来看看handler是什么?

从最上面的调用堆栈里面有这些handler

  1. com.alibaba.dubbo.remoting.transport.DecodeHandler#DecodeHandler
  2. com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler
  3. // 最上面调用堆栈中com alibaba dubbo.rpcprotocol.dubbo.DubboProtocol$1.reply其实就是线面这个接口的实现类
  4. com.alibaba.dubbo.remoting.exchange.ExchangeHandler

之前在dubbo export中说过启动NettyServer的调用堆栈,但是并没有详细看每一个调用方法,这里把相关重要的方法拿出来

  1. // com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#requestHandler
  2. private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
  3. // 这些请求received、connected、disconnected最终都会调用下面这个方法处理
  4. public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
  5. // 省略中间代码...
  6. }
  7. // 省略中间代码...
  8. }
  9. // com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#createServer
  10. private ExchangeServer createServer(URL url) {
  11. // 省略中间代码...
  12. // 这里的handler就是上面初始化的,是一个匿名内部类,也就是com.alibaba.dubbo.remoting.exchange.ExchangeHandler的实现类
  13. server = Exchangers.bind(url, requestHandler);
  14. // 省略中间代码...
  15. return server;
  16. }
  17. // com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchanger#bind
  18. public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
  19. // 这里的handler就是上面bind方法传入的requestHandler
  20. // 所以这里就是初始化DecodeHandler和HeaderExchangeHandler的地方,也就说传入Transporters.bind方法的是DecodeHandler类型
  21. return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
  22. }

从最上面的堆栈已经知道这个handler其实就是DecodeHandler,也就是初始化ChannelEventRunnable的时候传入的handler,接下来需要弄清楚的是为什么是DecodeHandler。

上面刚说过ChannelEventRunnable的初始化是由AllChannelHandler中的某一个方法初始化的,那么作为构造参数传入ChannelEventRunnable的handler也就是WrappedChannelHandler#handler(这个类是AllChannelHandler的超类),现在要找到AllChannelHandler是怎么初始化的。

  1. // com.alibaba.dubbo.remoting.transport.netty.NettyServer#NettyServer
  2. // 上面说handler的初始化的时候,Transporters.bind方法会最终调用NettyServer的构造方法
  3. public NettyServer(URL url, ChannelHandler handler) throws RemotingException{
  4. // 这里的handler就是DecodeHandler
  5. super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
  6. }
  7. // com.alibaba.dubbo.remoting.transport.dispatcher.ChannelHandlers#wrap
  8. public static ChannelHandler wrap(ChannelHandler handler, URL url){
  9. // 这里的handler是DecodeHandler
  10. return ChannelHandlers.getInstance().wrapInternal(handler, url);
  11. }
  12. // com.alibaba.dubbo.remoting.transport.dispatcher.ChannelHandlers#wrapInternal
  13. protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
  14. // 这里的handler是DecodeHandler
  15. // 先获取Dispatcher的扩展类,默认是com.alibaba.dubbo.remoting.transport.dispatcher.all.AllDispatcher
  16. // 然后调用AllDispatcher.dispatch方法
  17. return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
  18. .getAdaptiveExtension().dispatch(handler, url)));
  19. }
  20. // com.alibaba.dubbo.remoting.transport.dispatcher.all.AllDispatcher#dispatch
  21. public ChannelHandler dispatch(ChannelHandler handler, URL url) {
  22. // 这里的handler是DecodeHandler,所以AllChannelHandler的超类WrappedChannelHandler#handler就是DecodeHandler
  23. return new AllChannelHandler(handler, url);
  24. }

也就是ChannelEventRunnable中的handler就是HeaderExchanger#bind方法中new出来的DecodeHandler类型的对象

filter链的构造本来也是在provider export服务的时候完成的,同理consumer端是在refer服务的时候完成filter链的构造。

consumer和provider的filter链都是在下面的类中构造的,查看前面的service_export和service_reference的调用堆栈就可以看到对该类的调用。

  1. // com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
  2. public class ProtocolFilterWrapper implements Protocol {
  3. private final Protocol protocol;
  4. public ProtocolFilterWrapper(Protocol protocol){
  5. if (protocol == null) {
  6. throw new IllegalArgumentException("protocol == null");
  7. }
  8. this.protocol = protocol;
  9. }
  10. public int getDefaultPort() {
  11. return protocol.getDefaultPort();
  12. }
  13. // service export的时候调用
  14. public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
  15. if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
  16. return protocol.export(invoker);
  17. }
  18. // 先构造filter链再继续后面的export
  19. return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
  20. }
  21. // consumer refer的还是调用
  22. public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
  23. if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
  24. return protocol.refer(type, url);
  25. }
  26. // 这里是先refer调用创建DubboInvoker,然后才构造filter链,因为consumer是先经过filter链,再经过DubboInvoker处理,而provider是先经过DubboProtocol处理,然后调用filter链
  27. return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
  28. }
  29. public void destroy() {
  30. protocol.destroy();
  31. }
  32. //
  33. private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
  34. Invoker<T> last = invoker;
  35. // 获取所有符合条件的filter扩展,条件包括
  36. // 1. filter扩展类上面group对应的值和要求的group(入参)一致
  37. // 2. url中也可以指定加载的filter或者剔除的filter,url配置的key就是入参的key
  38. List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
  39. if (filters.size() > 0) {
  40. for (int i = filters.size() - 1; i >= 0; i --) {
  41. final Filter filter = filters.get(i);
  42. final Invoker<T> next = last;
  43. // 每个filter使用一个Invoker包裹
  44. last = new Invoker<T>() {
  45. public Class<T> getInterface() {
  46. return invoker.getInterface();
  47. }
  48. public URL getUrl() {
  49. return invoker.getUrl();
  50. }
  51. public boolean isAvailable() {
  52. return invoker.isAvailable();
  53. }
  54. public Result invoke(Invocation invocation) throws RpcException {
  55. // 将next传入,在filter负责调用,由此构成链
  56. return filter.invoke(next, invocation);
  57. }
  58. public void destroy() {
  59. invoker.destroy();
  60. }
  61. @Override
  62. public String toString() {
  63. return invoker.toString();
  64. }
  65. };
  66. }
  67. }
  68. return last;
  69. }
  70. }

所以现在返回看最前面的调用堆栈一切应该是顺理成章了,netty接收到I/O请求后,通知到NioWorker,在NioWorker线程中经过pipeline的处理后启动了ChannelEventRunnable线程;在ChannelEventRunnable线程线程中根据接收到的不同事件调用handler的不同方法来处理,经过多个handler处理之后,经过的是filter链,最后会调用到我们编写的service方法。执行完我们的方法之后,dubo会将结果通过netty发送给consumer。

上面通过提问题的方式,解读了一些阅读源码中的关键代码,现在将service export和service reply结合起来,再去阅读源代码就就本能读懂所有主流程了,就能明白源代码为什么这么写。

版权声明:本文为sunshine-2015原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/sunshine-2015/p/8379902.html