前言

​ 前面我们研究了RPC的原理,市面上有很多基于RPC思想实现的框架,比如有Dubbo。今天就从Dubbo的SPI机制、服务注册与发现源码及网络通信过程去深入剖析下Dubbo。

Dubbo架构

概述

Dubbo是阿里巴巴公司开源的一个高性能优秀的服务框架,使得应用可通过高性能的RPC 实现服务的输出和输入功能,可以和Spring框架无缝集成。
Dubbo是一款高性能、轻量级的开源Java RPC框架,它提供了三大核心能力:面向接口的远程方法调用,智能容错和负载均衡,以及服务自动注册和发现。

调用流程:

  1. 服务容器负责启动,加载,运行服务提供者。
  2. 服务提供者在启动时,向注册中心注册自己提供的服务。
  3. 服务消费者在启动时,向注册中心订阅自己所需的服务。
  4. 注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。
  5. 服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用失败,再选另一台调用。
  6. 服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心。

架构体系

源码结构

  • dubbo-common:公共逻辑模块: 包括Util类和通用模型
  • dubbo-remoting 远程通信模块: 相当于dubbo协议的实现,如果RPC使用RMI协议则不需要使用此包
  • dubbo-rpc 远程调用模块: 抽象各种协议,以及动态代理,包含一对一的调用,不关心集群的原理。
  • dubbo-cluster 集群模块: 将多个服务提供方伪装成一个提供方,包括负载均衡,容错,路由等,集群的地址列表可以是静态配置的,也可以是注册中心下发的.
  • dubbo-registry 注册中心模块: 基于注册中心下发的集群方式,以及对各种注册中心的抽象
  • dubbo-monitor 监控模块: 统计服务调用次数,调用时间,调用链跟踪的服务.
  • dubbo-config 配置模块: 是dubbo对外的api,用户通过config使用dubbo,隐藏dubbo所有细节
  • dubbo-container 容器模块: 是一个standlone的容器,以简单的main加载spring启动,因为服务通常不需要Tomcat/Jboss等web容器的特性,没必要用web容器去加载服务.

整体设计

  • 图中左边淡蓝背景的为服务消费方使用的接口,右边淡绿色背景的为服务提供方使用的接口,位于中轴线上的为双方都用到的接口。
  • 图中从下至上分为十层,各层均为单向依赖,每一层都可以剥离上层被复用,其中,Service 和Config 层为API,其它各层均为SPI。
  • 图中绿色小块的为扩展接口,蓝色小块为实现类,图中只显示用于关联各层的实现类。
  • 图中蓝色虚线为初始化过程,即启动时组装链,红色实线为方法调用过程,即运行时调时链,紫色三角箭头为继承,可以把子类看作父类的同一个节点,线上的文字为调用的方法。

各层说明

  • config 配置层:对外配置接口,以 ServiceConfig , ReferenceConfig 为中心,可以直接初始化配置类,也可以通过spring 解析配置生成配置类
  • proxy 服务代理层:服务接口透明代理,生成服务的客户端Stub 和服务器端Skeleton, 以ServiceProxy 为中心,扩展接口为 ProxyFactory
  • registry 注册中心层:封装服务地址的注册与发现,以服务URL 为中心,扩展接口为RegistryFactory , Registry , RegistryService
  • cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 Cluster , Directory , Router , LoadBalance
  • monitor 监控层:RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为MonitorFactory , Monitor , MonitorService
  • protocol 远程调用层:封装RPC 调用,以 Invocation , Result 为中心,扩展接口为Protocol , Invoker , Exporter
  • exchange 信息交换层:封装请求响应模式,同步转异步,以 Request , Response 为中心,扩展接口为 Exchanger , ExchangeChannel , ExchangeClient , ExchangeServer
  • transport 网络传输层:抽象mina 和netty 为统一接口,以 Message 为中心,扩展接口为Channel , Transporter , Client , Server , Codec
  • serialize 数据序列化层:可复用的一些工具,扩展接口为 Serialization , ObjectInput ,ObjectOutput , ThreadPool

调用流程

对照上面的整体架构图可以大致分为以下步骤:

1、服务提供者启动,开启Netty服务,创建Zookeeper客户端,向注册中心注册服务。

2、服务消费者启动,通过Zookeeper向注册中心获取服务提供者列表,与服务提供者通过Netty建立长连接。

3、服务消费者通过接口开始远程调用服务,ProxyFactory通过初始化Proxy对象,Proxy通过创建动态代理对象。

4、动态代理对象通过invoke方法,层层包装生成一个Invoker对象,该对象包含了代理对象。

5、Invoker通过路由,负载均衡选择了一个最合适的服务提供者,在通过加入各种过滤器,协议层包装生成一个新的DubboInvoker对象。

6、再通过交换成将DubboInvoker对象包装成一个Reuqest对象,该对象通过序列化通过NettyClient传输到服务提供者的NettyServer端。

7、到了服务提供者这边,再通过反序列化、协议解密等操作生成一个DubboExporter对象,再层层传递处理,会生成一个服务提供端的Invoker对象.

8、这个Invoker对象会调用本地服务,获得结果再通过层层回调返回到服务消费者,服务消费者拿到结果后,再解析获得最终结果。

Dubbo中的SPI机制

什么是SPI

概述

在Dubbo 中,SPI 是一个非常重要的模块。基于SPI,我们可以很容易的对Dubbo 进行拓展。如果大家想要学习Dubbo 的源码,SPI 机制务必弄懂。接下来,我们先来了解一下Java SPI 与Dubbo SPI 的用法,然后再来分析Dubbo SPI 的源码。

SPI是Service Provider Interface 服务提供接口缩写,是一种服务发现机制。SPI的本质是将接口的实现类的全限定名定义在配置文件中,并有服务器读取配置文件,并加载实现类。这样就可以在运行的时候,动态为接口替换实现类。

JDK中的SPI

Java SPI 实际上是“基于接口的编程+策略模式+配置文件”组合实现的动态加载机制。

通过一个案例我们来认识下SPI

定义一个接口:

package com.laowang;

/**
 * @author 原
 * @date 2021/3/27
 * @since 1.0
 **/
public interface User {

    String showName();
}

定义两个实现类

package com.laowang.impl;

import com.laowang.User;

/**
 * @author 原
 * @date 2021/3/27
 * @since 1.0
 **/
public class Student implements User {
    @Override
    public String showName() {
        System.out.println("my name is laowang");
        return null;
    }
}
package com.laowang.impl;

import com.laowang.User;

/**
 * @author 原
 * @date 2021/3/27
 * @since 1.0
 **/
public class Teacher implements User {
    @Override
    public String showName() {
        System.out.println("my name is zhangsan");
        return null;
    }
}

在resources目录下创建文件夹META-INF.services,并在该文件夹下创建一个名称与User的全路径一致的文件com.laowang.User

在文件中写入,两个实现类的全路径名

编写测试类:

package com.laowang;

import java.util.ServiceLoader;

/**
 * @author 原
 * @date 2021/3/27
 * @since 1.0
 **/
public class SpiTest {
    public static void main(String[] args) {
        ServiceLoader<User> serviceLoader = ServiceLoader.load(User.class);
        serviceLoader.forEach(User::showName);
    }
}

运行结果:

我们发现通过SPI机制,帮我们自动运行了两个实现类。

通过查看ServiceLoader源码:

其实通过读取配置文件中实现类的全路径类名,通过反射创建对象,并放入providers容器中。

总结:

调用过程
应用程序调用ServiceLoader.load方法,创建一个新的ServiceLoader,并实例化该类中的成员变量
应用程序通过迭代器接口获取对象实例,ServiceLoader先判断成员变量providers对象中(LinkedHashMap<String,S>类型)是否有缓存实例对象,如果有缓存,直接返回。如果没有缓存,执行类的装载,
优点
使用Java SPI 机制的优势是实现解耦,使得接口的定义与具体业务实现分离,而不是耦合在一起。应用进程可以根据实际业务情况启用或替换具体组件。
缺点
不能按需加载。虽然ServiceLoader 做了延迟载入,但是基本只能通过遍历全部获取,也就是接口的实现类得全部载入并实例化一遍。如果你并不想用某些实现类,或者某些类实例化很耗时,它也被载入并实例化了,这就造成了浪费。
获取某个实现类的方式不够灵活,只能通过Iterator 形式获取,不能根据某个参数来获取对应的实现类。
多个并发多线程使用ServiceLoader 类的实例是不安全的。
加载不到实现类时抛出并不是真正原因的异常,错误很难定位。

Dubbo中的SPI

Dubbo 并未使用Java SPI,而是重新实现了一套功能更强的SPI 机制。Dubbo SPI 的相关逻辑被封装在了ExtensionLoader 类中,通过ExtensionLoader,我们可以加载指定的实现类。

栗子

与Java SPI 实现类配置不同,Dubbo SPI 是通过键值对的方式进行配置,这样我们可以按需加载指定的实现类。下面来演示Dubbo SPI 的用法:
Dubbo SPI 所需的配置文件需放置在META-INF/dubbo 路径下,与Java SPI 实现类配置不同,DubboSPI 是通过键值对的方式进行配置,配置内容如下。

optimusPrime = org.apache.spi.OptimusPrime
bumblebee = org.apache.spi.Bumblebee

在使用Dubbo SPI 时,需要在接口上标注@SPI 注解。

@SPI
public interface Robot {
void sayHello();
}

通过ExtensionLoader,我们可以加载指定的实现类,下面来演示Dubbo SPI :

public class DubboSPITest {
   @Test
   public void sayHello() throws Exception {
       ExtensionLoader<Robot> extensionLoader =
           ExtensionLoader.getExtensionLoader(Robot.class);
       Robot optimusPrime = extensionLoader.getExtension("optimusPrime");
       optimusPrime.sayHello();
       Robot bumblebee = extensionLoader.getExtension("bumblebee");
       bumblebee.sayHello();
   }
}

Dubbo SPI 除了支持按需加载接口实现类,还增加了IOC 和AOP 等特性,这些特性将会在接下来的源码分析章节中一一进行介绍。

源码分析

ExtensionLoader 的getExtensionLoader 方法获取一个ExtensionLoader 实例,然后再通过ExtensionLoader 的getExtension 方法获取拓展类对象。下面我们从ExtensionLoader 的getExtension 方法作为入口,对拓展类对象的获取过程进行详细的分析。

public T getExtension(String name) {
       if (StringUtils.isEmpty(name)) {
           throw new IllegalArgumentException("Extension name == null");
       }
       if ("true".equals(name)) {
            // 获取默认的拓展实现类
           return getDefaultExtension();
       }
       // Holder,顾名思义,用于持有目标对象 就是从容器中获取,如果没有直接new一个Holder
       Holder<Object> holder = getOrCreateHolder(name);
       //获取目标对象实例
       Object instance = holder.get();
        // 如果目标对象实例为null 就需要通过双重检查创建实例
       if (instance == null) {
           synchronized (holder) {
               instance = holder.get();
               if (instance == null) {
                   // 创建拓展实例
                   instance = createExtension(name);
                   // 设置实例到 holder 中
                   holder.set(instance);
               }
           }
       }
       return (T) instance;
   }

上面代码的逻辑比较简单,首先检查缓存,缓存未命中则创建拓展对象。下面我们来看一下创建拓展对象的过程是怎样的。

private T createExtension(String name) {
   // 从配置文件中加载所有的拓展类,可得到“配置项名称”到“配置类”的映射关系表
   Class<?> clazz = getExtensionClasses().get(name);
   if (clazz == null) {
       throw findException(name);
   }
   try {
       //从容器中获取对应的实例对象 如果不存在就通过反射创建
       T instance = (T) EXTENSION_INSTANCES.get(clazz);
       if (instance == null) {
           // 通过反射创建实例
           EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
           instance = (T) EXTENSION_INSTANCES.get(clazz);
       }
       // 向实例中注入依赖 下面是IOC和AOP的实现
       injectExtension(instance);
       Set<Class<?>> wrapperClasses = cachedWrapperClasses;
       if (CollectionUtils.isNotEmpty(wrapperClasses)) {
             // 循环创建 Wrapper 实例
           for (Class<?> wrapperClass : wrapperClasses) {
               // 将当前 instance 作为参数传给 Wrapper 的构造方法,并通过反射创建Wrapper 实例。
               // 然后向 Wrapper 实例中注入依赖,最后将 Wrapper 实例再次赋值给instance 变量
               instance = injectExtension(
                   (T)
wrapperClass.getConstructor(type).newInstance(instance));
           }
       }

createExtension 方法的逻辑稍复杂一下,包含了如下的步骤:

  1. 通过getExtensionClasses 获取所有的拓展类

  2. 通过反射创建拓展对象

  3. 向拓展对象中注入依赖

  4. 将拓展对象包裹在相应的Wrapper 对象中

以上步骤中,第一个步骤是加载拓展类的关键,第三和第四个步骤是Dubbo IOC 与AOP 的具体实现。由于此类设计源码较多,这里简单的总结下ExtensionLoader整个执行逻辑:

getExtension(String name)  #根据key获取拓展对象
    -->createExtension(String name) #创建拓展实例
        -->getExtensionClasses #根据路径获取所有的拓展类
            -->loadExtensionClasses #加载拓展类
                -->cacheDefaultExtensionName #解析@SPI注解
            -->loadDirectory #方法加载指定文件夹配置文件
                -->loadResource #加载资源
                    -->loadClass #加载类,并通过 loadClass 方法对类进行缓存

Dubbo的SPI如何实现IOC和AOP的

Dubbo IOC

Dubbo IOC 是通过setter 方法注入依赖。Dubbo 首先会通过反射获取到实例的所有方法,然后再遍历方法列表,检测方法名是否具有setter 方法特征。若有,则通过ObjectFactory 获取依赖对象,最后通过反射调用setter 方法将依赖设置到目标对象中。整个过程对应的代码如下:

    private T injectExtension(T instance) {
        try {
            if (objectFactory != null) {
                //获取实例的所有方法
                for (Method method : instance.getClass().getMethods()) {
                    //isSetter做的事:检测方法是否以 set 开头,且方法仅有一个参数,且方法访问级别为 public
                    if (isSetter(method)) {
                        /**
                         * Check {@link DisableInject} to see if we need auto injection for this property
                         */
                        if (method.getAnnotation(DisableInject.class) != null) {
                            continue;
                        }
                        Class<?> pt = method.getParameterTypes()[0];
                        if (ReflectUtils.isPrimitives(pt)) {
                            continue;
                        }
                        try {
                            String property = getSetterProperty(method);
                            //获取依赖对象
                            Object object = objectFactory.getExtension(pt, property);
                            if (object != null) {
                                //设置属性
                                method.invoke(instance, object);
                            }
                        } catch (Exception e) {
                            logger.error("Failed to inject via method " + method.getName()
                                    + " of interface " + type.getName() + ": " + e.getMessage(), e);
                        }
                    }
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        return instance;
    }

Dubbo Aop

在说这个之前,我们得先知道装饰者模式

装饰者模式:在不改变原类文件以及不使用继承的情况下,动态地将责任附加到对象上,从而实现动态拓展一个对象的功能。它是通过创建一个包装对象,也就是装饰来包裹真实的对象。

在用Spring的时候,我们经常会用到AOP功能。在目标类的方法前后插入其他逻辑。比如通常使用Spring AOP来实现日志,监控和鉴权等功能。Dubbo的扩展机制,是否也支持类似的功能呢?答案是yes。在Dubbo中,有一种特殊的类,被称为Wrapper类。通过装饰者模式,使用包装类包装原始的扩展点实例。在原始扩展点实现前后插入其他逻辑,实现AOP功能。

一般来说装饰者模式有下面几个参与者:
Component:装饰者和被装饰者共同的父类,是一个接口或者抽象类,用来定义基本行为
ConcreteComponent:定义具体对象,即被装饰者
Decorator:抽象装饰者,继承自Component,从外类来扩展ConcreteComponent。对于ConcreteComponent来说,不需要知道Decorator的存在,Decorator是一个接口或抽象类
ConcreteDecorator:具体装饰者,用于扩展ConcreteComponent

//获取所有需要包装的类
Set<Class<?>> wrapperClasses = cachedWrapperClasses;

我们再看看cachedWrapperClasses是什么?

private Set<Class<?>> cachedWrapperClasses;

是一个set集合,那么集合是什么时候添加元素的呢?

    /**
     * cache wrapper class
     * <p>
     * like: ProtocolFilterWrapper, ProtocolListenerWrapper
     */
    private void cacheWrapperClass(Class<?> clazz) {
        if (cachedWrapperClasses == null) {
            cachedWrapperClasses = new ConcurrentHashSet<>();
        }
        cachedWrapperClasses.add(clazz);
    }

通过这个方法添加的,再看看谁调用了这个私有方法:

    /**
     * test if clazz is a wrapper class
     * <p>
     * which has Constructor with given class type as its only argument
     */
    private boolean isWrapperClass(Class<?> clazz) {
        try {
            clazz.getConstructor(type);
            return true;
        } catch (NoSuchMethodException e) {
            return false;
        }
    }

原来是通过isWrapperClass这个方法,判断有没有其他对象中的构造方法中持有本对象,如果有,dubbo就认为这是个装饰类,调用装饰者类的构造方法,并返回实例对象

然后通过实例化这个包装类代替需要加载的这个类。这样执行的方法就是包装类的方法。

Dubbo中的动态编译

我们知道在Dubbo 中,很多拓展都是通过SPI 机制 进行加载的,比如Protocol、Cluster、LoadBalance、ProxyFactory 等。有时,有些拓展并不想在框架启动阶段被加载,而是希望在拓展方法被调用时,根据运行时参数进行加载,即根据参数动态加载实现类。

这种在运行时,根据方法参数才动态决定使用具体的拓展,在dubbo中就叫做扩展点自适应实例。其实是一个扩展点的代理,将扩展的选择从Dubbo启动时,延迟到RPC调用时。Dubbo中每一个扩展点都有一个自适应类,如果没有显式提供,Dubbo会自动为我们创建一个,默认使用Javaassist。

自适应拓展机制的实现逻辑是这样的

  1. 首先Dubbo 会为拓展接口生成具有代理功能的代码;
  2. 通过javassist 或jdk 编译这段代码,得到Class 类;
  3. 通过反射创建代理类;
  4. 在代理类中,通过URL对象的参数来确定到底调用哪个实现类;

javassist

Javassist是一个开源的分析、编辑和创建Java字节码的类库。是由东京工业大学的数学和计算机科学系的Shigeru Chiba (千叶滋)所创建的。它已加入了开放源代码JBoss 应用服务器项目,通过使用Javassist对字节码操作为JBoss实现动态AOP框架。javassist是jboss的一个子项目,其主要的优点,在于简单,而且快速。直接使用java编码的形式,而不需要了解虚拟机指令,就能动态改变类的结构,或者动态生成类。

/**
*  Javassist是一个开源的分析、编辑和创建Java字节码的类库
*  能动态改变类的结构,或者动态生成类
*/
public class CompilerByJavassist {
public static void main(String[] args) throws Exception {
// ClassPool:class对象容器
ClassPool pool = ClassPool.getDefault();
// 通过ClassPool生成一个User类
CtClass ctClass = pool.makeClass("com.itheima.domain.User");
// 添加属性     -- private String username
CtField enameField = new CtField(pool.getCtClass("java.lang.String"),
"username", ctClass);
enameField.setModifiers(Modifier.PRIVATE);
ctClass.addField(enameField);
// 添加属性    -- private int age
CtField enoField = new CtField(pool.getCtClass("int"), "age", ctClass);
enoField.setModifiers(Modifier.PRIVATE);
ctClass.addField(enoField);
//添加方法
ctClass.addMethod(CtNewMethod.getter("getUsername", enameField));
ctClass.addMethod(CtNewMethod.setter("setUsername", enameField));
ctClass.addMethod(CtNewMethod.getter("getAge", enoField));
ctClass.addMethod(CtNewMethod.setter("setAge", enoField));
// 无参构造器
CtConstructor constructor = new CtConstructor(null, ctClass);
constructor.setBody("{}");
ctClass.addConstructor(constructor);
// 添加构造函数
//ctClass.addConstructor(new CtConstructor(new CtClass[] {}, ctClass));
CtConstructor ctConstructor = new CtConstructor(new CtClass[]
{pool.get(String.class.getName()),CtClass.intType}, ctClass);
ctConstructor.setBody("{\n this.username=$1; \n this.age=$2;\n}");
ctClass.addConstructor(ctConstructor);
// 添加自定义方法
CtMethod ctMethod = new CtMethod(CtClass.voidType, "printUser",new
CtClass[] {}, ctClass);
// 为自定义方法设置修饰符
ctMethod.setModifiers(Modifier.PUBLIC);
// 为自定义方法设置函数体
StringBuffer buffer2 = new StringBuffer();
buffer2.append("{\nSystem.out.println(\"用户信息如下\");\n")
.append("System.out.println(\"用户名=\"+username);\n")
.append("System.out.println(\"年龄=\"+age);\n").append("}");
ctMethod.setBody(buffer2.toString());
ctClass.addMethod(ctMethod);
//生成一个class
Class<?> clazz = ctClass.toClass();
Constructor cons2 =
clazz.getDeclaredConstructor(String.class,Integer.TYPE);
Object obj = cons2.newInstance("itheima",20);
//反射 执行方法
obj.getClass().getMethod("printUser", new Class[] {})
.invoke(obj, new Object[] {});
// 把生成的class文件写入文件
byte[] byteArr = ctClass.toBytecode();
FileOutputStream fos = new FileOutputStream(new File("D://User.class"));
fos.write(byteArr);
fos.close();
}
}

源码分析

Adaptive注解

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Adaptive {
   String[] value() default {};
}

Adaptive 可注解在类或方法上。
标注在类上:Dubbo 不会为该类生成代理类。
标注在方法上:Dubbo 则会为该方法生成代理逻辑,表示当前方法需要根据 参数URL 调用对应的扩展点实现。

dubbo中每一个扩展点都有一个自适应类,如果没有显式提供,Dubbo会自动为我们创建一个,默认使用Javaassist。 先来看下创建自适应扩展类的代码

//1、看下extensionLoader的获取方法
ExtensionLoader<Robot>extensionLoader=ExtensionLoader.getExtensionLoader(Robot.class);
//2、最终调用的是ExtensionLoader的构造方法
private ExtensionLoader(Class<?> type) {
        this.type = type;
        objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
    }
//3、getAdaptiveExtension()看看干了什么事
    public T getAdaptiveExtension() {
        //获取自适应扩展类,如果没有就开始初始化一个
        Object instance = cachedAdaptiveInstance.get();
        if (instance == null) {
            if (createAdaptiveInstanceError == null) {
                synchronized (cachedAdaptiveInstance) {
                    instance = cachedAdaptiveInstance.get();
                    if (instance == null) {
                        try {
                            //这里创建了一个自适应扩展类
                            instance = createAdaptiveExtension();
                            cachedAdaptiveInstance.set(instance);
                        } catch (Throwable t) {
                            createAdaptiveInstanceError = t;
                            throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
                        }
                    }
                }
            } else {
                throw new IllegalStateException("Failed to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
            }
        }

        return (T) instance;
    }
//看看createAdaptiveExtension()
 private T createAdaptiveExtension() {
        try {
            return injectExtension((T) getAdaptiveExtensionClass().newInstance());
        } catch (Exception e) {
            throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
        }
    }
//再进到getAdaptiveExtensionClass()
    private Class<?> getAdaptiveExtensionClass() {
        getExtensionClasses();
        if (cachedAdaptiveClass != null) {
            return cachedAdaptiveClass;
        }
        return cachedAdaptiveClass = createAdaptiveExtensionClass();
    }
//继续追进去createAdaptiveExtensionClass()
private Class<?> createAdaptiveExtensionClass() {
        String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
        ClassLoader classLoader = findClassLoader();
        org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
        return compiler.compile(code, classLoader);
    }

//看看compiler
@SPI("javassist")
public interface Compiler {

    /**
     * Compile java source code.
     *
     * @param code        Java source code
     * @param classLoader classloader
     * @return Compiled class
     */
    Class<?> compile(String code, ClassLoader classLoader);

}
//其实到这里就知道了,通过生成一个类的字符串,再通过javassist生成一个对象

createAdaptiveExtensionClassCode()方法中使用一个StringBuilder来构建自适应类的Java源码。方法实现比较长,这里就不贴代码了。这种生成字节码的方式也挺有意思的,先生成Java源代码,然后编译,加载到jvm中。通过这种方式,可以更好的控制生成的Java类。而且这样也不用care各个字节码生成框架的api等。因为xxx.java文件是Java通用的,也是我们最熟悉的。只是代码的可读性不强,需要一点一点构建xx.java的内容。

服务暴露与发现

服务暴露

名词解释

在Dubbo 的核心领域模型中:

  • Invoker 是实体域,它是Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。在服务提供方,Invoker用于调用服务提供类。在服务消费方,Invoker用于执行远程调用。
  • Protocol 是服务域,它是Invoker 暴露和引用的主功能入口,它负责Invoker 的生命周期管理。
    export:暴露远程服务
    refer:引用远程服务
  • proxyFactory:获取一个接口的代理类
    getInvoker:针对server端,将服务对象,如DemoServiceImpl包装成一个Invoker对象
    getProxy:针对client端,创建接口的代理对象,例如DemoService的接口。
  • Invocation 是会话域,它持有调用过程中的变量,比如方法名,参数等

整体流程

在详细探讨服务暴露细节之前 , 我们先看一下整体duubo的服务暴露原理

在整体上看,Dubbo 框架做服务暴露分为两大部分 , 第一步将持有的服务实例通过代理转换成Invoker, 第二步会把Invoker 通过具体的协议 ( 比如Dubbo ) 转换成Exporter, 框架做了这层抽象也大大方便了功能扩展 。

服务提供方暴露服务的蓝色初始化链,时序图如下:

源码分析

服务导出的入口方法是ServiceBean 的onApplicationEvent。onApplicationEvent 是一个事件响应方法,该方法会在收到Spring 上下文刷新事件后执行服务导出操作。方法代码如下:

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (!isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            export();
        }
    }

通过export最终找到doExportUrls()方法

private void doExportUrls() {
    	//加载配置文件中的所有注册中心,并且封装为dubbo内部的URL对象列表
        List<URL> registryURLs = loadRegistries(true);
        //循环所有协议配置,根据不同的协议,向注册中心中发起注册
        for (ProtocolConfig protocolConfig : protocols) {
            String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
            ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
            ApplicationModel.initProviderModel(pathKey, providerModel);
            //服务暴露方法
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

doExportUrlsFor1Protocol()方法代码老多了,我们只关系核心的地方

...
    if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

            //本地暴露,将服务数据记录到本地JVM中
            if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
           //远程暴露,向注册中心发送数据
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                if (!isOnlyInJvm() && logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    for (URL registryURL : registryURLs) {
                        //if protocol is only injvm ,not register
                        if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                            continue;
                        }
                        url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
		                    url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
	                    }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }

                        // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                        }
						// 为服务提供类(ref)生成 Invoker
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                       // DelegateProviderMetaDataInvoker 用于持有 Invoker 和ServiceConfig
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                        // 导出服务,并生成 Exporter
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    //不存在注册中心,仅导出服务
                    ....
                }
                /**
                 * @since 2.7.0
                 * ServiceData Store
                 */
                MetadataReportService metadataReportService = null;
                if ((metadataReportService = getMetadataReportService()) != null) {
                    metadataReportService.publishProvider(url);
                }
            }
        }
        this.urls.add(url);

上面代码根据url 中的scope 参数决定服务导出方式,分别如下:
scope = none,不导出服务
scope != remote,导出到本地
scope != local,导出到远程

不管是导出到本地,还是远程。进行服务导出之前,均需要先创建Invoker,这是一个很重要的步骤。因此下面先来分析Invoker 的创建过程。Invoker 是由ProxyFactory 创建而来,Dubbo 默认的ProxyFactory 实现类是JavassistProxyFactory。下面我们到JavassistProxyFactory 代码中,探索Invoker 的创建过程。如下:

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // 为目标类创建warpper
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        //创建匿名才invoker对象,并实现doinvoke方法
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                // 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

Invoke创建成功之后,接下来我们来看本地导出

    /**
     * always export injvm
     */
    private void exportLocal(URL url) {
        URL local = URLBuilder.from(url)
                .setProtocol(LOCAL_PROTOCOL) // 设置协议头为 injvm
                .setHost(LOCALHOST_VALUE)//本地ip:127.0.0.1
                .setPort(0)
                .build();
        // 创建 Invoker,并导出服务,这里的 protocol 会在运行时调用 InjvmProtocol 的export 方法
        Exporter<?> exporter = protocol.export(
                proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
        exporters.add(exporter);
        logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
    }

exportLocal 方法比较简单,首先根据URL 协议头决定是否导出服务。若需导出,则创建一个新的URL并将协议头、主机名以及端口设置成新的值。然后创建Invoker,并调用InjvmProtocol 的export 方法导出服务。下面我们来看一下InjvmProtocol 的export 方法都做了哪些事情。

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
    }

如上,InjvmProtocol 的export 方法仅创建了一个InjvmExporter,无其他逻辑。到此导出服务到本地就分析完了。

再看看导出服务到远程

接下来,我们继续分析导出服务到远程的过程。导出服务到远程包含了服务导出与服务注册两个过程。先来分析服务导出逻辑。我们把目光移动到RegistryProtocol 的export 方法上。

    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        // 获取注册中心 URL
        URL registryUrl = getRegistryUrl(originInvoker);
        URL providerUrl = getProviderUrl(originInvoker);
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);

	    //导出服务
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

	    // 根据 URL 加载 Registry 实现类,比如 ZookeeperRegistry
        final Registry registry = getRegistry(originInvoker);

	    //获取已注册的服务提供者 URL,
        final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
        ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                registryUrl, registeredProviderUrl);
        //to judge if we need to delay publish
        boolean register = registeredProviderUrl.getParameter("register", true);
        if (register) {
            // 向注册中心注册服务
            register(registryUrl, registeredProviderUrl);
            providerInvokerWrapper.setReg(true);
        }

        //  向注册中心进行订阅 override 数据
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
        // 创建并返回 DestroyableExporter
        return new DestroyableExporter<>(exporter);
    }

上面代码看起来比较复杂,主要做如下一些操作:

  1. 调用doLocalExport 导出服务
  2. 向注册中心注册服务
  3. 向注册中心进行订阅override 数据
  4. 创建并返回DestroyableExporter

看看doLocalExport 做了什么

    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);

        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
            Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
            //protocol和配置的协议相关(dubbo:DubboProtocol)
            return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
        });
    }

接下来,我们把重点放在Protocol 的export 方法上。假设运行时协议为dubbo,此处的protocol 变量会在运行时加载DubboProtocol,并调用DubboProtocol 的export 方法。

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成。比如:demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
        String key = serviceKey(url);
        //创建DubboExporter
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter); //key:接口 (DemoService)

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }

            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }
		//启动服务
        openServer(url);
        //优化序列器
        optimizeSerialization(url);

        return exporter;
    }

如上,我们重点关注DubboExporter 的创建以及openServer 方法,其他逻辑看不懂也没关系,不影响理解服务导出过程。下面分析openServer 方法。

    private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(IS_SERVER_KEY, true);
        if (isServer) {
            //访问缓存
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                synchronized (this) {
                    server = serverMap.get(key);
                    if (server == null) {
                        //创建服务器实例
                        serverMap.put(key, createServer(url));
                    }
                }
            } else {
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }

接下来分析服务器实例的创建过程。如下

    private ExchangeServer createServer(URL url) {
        url = URLBuilder.from(url)
                // send readonly event when server closes, it's enabled by default
                .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
                // enable heartbeat by default
                .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
                .addParameter(CODEC_KEY, DubboCodec.NAME)
                .build();
        String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
        
		// 通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }

        ExchangeServer server;
        try {
            // 创建 ExchangeServer
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        
		// 获取 client 参数,可指定 netty,mina
        str = url.getParameter(CLIENT_KEY);
        if (str != null && str.length() > 0) {
            // 获取所有的 Transporter 实现类名称集合,比如 supportedTypes = [netty, mina]
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            // 检测当前 Dubbo 所支持的 Transporter 实现类名称列表中,
      		 // 是否包含 client 所表示的 Transporter,若不包含,则抛出异常
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }

        return server;
    }

如上,createServer 包含三个核心的逻辑。

第一是检测是否存在server 参数所代表的Transporter 拓展,不存在则抛出异常。

第二是创建服务器实例。

第三是检测是否支持client 参数所表示的Transporter 拓展,不存在也是抛出异常。两次检测操作所对应的代码较直白了,无需多说。但创建服务器的操作目前还不是很清晰,我们继续往下看。

    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        // 获取 Exchanger,默认为 HeaderExchanger。
   		// 紧接着调用 HeaderExchanger 的 bind 方法创建 ExchangeServer 实例
        return getExchanger(url).bind(url, handler);
    }

上面代码比较简单,就不多说了。下面看一下HeaderExchanger 的bind 方法。

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        // 创建 HeaderExchangeServer 实例,该方法包含了多个逻辑,分别如下:
        //   1. new HeaderExchangeHandler(handler)
        //   2. new DecodeHandler(new HeaderExchangeHandler(handler))
        //   3. Transporters.bind(url, new DecodeHandler(new
HeaderExchangeHandler(handler)))
        return new HeaderExchangeServer(Transporters.bind(url, new ChannelHandler[]{new DecodeHandler(new HeaderExchangeHandler(handler))}));
    }

HeaderExchanger 的bind 方法包含的逻辑比较多,但目前我们仅需关心Transporters 的bind 方法逻
辑即可。该方法的代码如下:

    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        } else if (handlers != null && handlers.length != 0) {
            Object handler;
            if (handlers.length == 1) {
                handler = handlers[0];
            } else {
                // 如果 handlers 元素数量大于1,则创建 ChannelHandler 分发器
                handler = new ChannelHandlerDispatcher(handlers);
            }
			// 获取自适应 Transporter 实例,并调用实例方法
            return getTransporter().bind(url, (ChannelHandler)handler);
        } else {
            throw new IllegalArgumentException("handlers == null");
        }
    }

如上,getTransporter() 方法获取的Transporter 是在运行时动态创建的,类名为TransporterAdaptive,也就是自适应拓展类。TransporterAdaptive 会在运行时根据传入的URL 参数决定加载什么类型的Transporter,默认为NettyTransporter。调用 NettyTransporter.bind(URL,ChannelHandler) 方法。创建一个 NettyServer 实例。调用 NettyServer.doOPen() 方法,服务器被开启,服务也被暴露出来了。

服务注册

本节内容以Zookeeper 注册中心作为分析目标,其他类型注册中心大家可自行分析。下面从服务注册
的入口方法开始分析,我们把目光再次移到RegistryProtocol 的export 方法上。如下:

进入到register()方法

    public void register(URL registryUrl, URL registeredProviderUrl) {
        //获得注册中心实例
        Registry registry = registryFactory.getRegistry(registryUrl);
        //进行注册
        registry.register(registeredProviderUrl);
    }

看看getRegistry()方法

    @Override
    public Registry getRegistry(URL url) {
        url = URLBuilder.from(url)
                .setPath(RegistryService.class.getName())
                .addParameter(INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(EXPORT_KEY, REFER_KEY)
                .build();
        String key = url.toServiceStringWithoutResolving();
        // Lock the registry access process to ensure a single instance of the registry
        LOCK.lock();
        try {
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            //create registry by spi/ioc
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }

进入createRegistry()方法

    @Override
    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }
    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        //// 获取组名,默认为 dubbo
        String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(PATH_SEPARATOR)) {
            group = PATH_SEPARATOR + group;
        }
        this.root = group;
        // 创建 Zookeeper 客户端,默认为 CuratorZookeeperTransporter
        zkClient = zookeeperTransporter.connect(url);
        // 添加状态监听器
        zkClient.addStateListener(state -> {
            if (state == StateListener.RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        });
    }

在上面的代码代码中,我们重点关注ZookeeperTransporter 的connect 方法调用,这个方法用于创建
Zookeeper 客户端。创建好Zookeeper 客户端,意味着注册中心的创建过程就结束了。

搞懂了服务注册的本质,那么接下来我们就可以去阅读服务注册的代码了。

    public void doRegister(URL url) {
        try {
            // 通过 Zookeeper 客户端创建节点,节点路径由 toUrlPath 方法生成,路径格式如下:
       //  /${group}/${serviceInterface}/providers/${url}
       // 比如 /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
    @Override
    public void create(String path, boolean ephemeral) {
        if (!ephemeral) {
            // 如果要创建的节点类型非临时节点,那么这里要检测节点是否存在
            if (checkExists(path)) {
                return;
            }
        }
        int i = path.lastIndexOf('/');
        if (i > 0) {
            // 递归创建上一级路径
            create(path.substring(0, i), false);
        }
        // 根据 ephemeral 的值创建临时或持久节点
        if (ephemeral) {
            createEphemeral(path);
        } else {
            createPersistent(path);
        }
    }

好了,到此关于服务注册的过程就分析完了。整个过程可简单总结为:先创建注册中心实例,之后再通过注册中心实例注册服务。

总结

  1. 在有注册中心,需要注册提供者地址的情况下,ServiceConfig 解析出的URL 格式为:registry:// registry-host/org.apache.dubbo.registry.RegistryService?export=URL.encode(“dubbo://service-host/{服务名}/{版本号}”)
  2. 基于Dubbo SPI 的自适应机制,通过URL registry:// 协议头识别,就调用RegistryProtocol#export() 方法
  3. 将具体的服务类名,比如 DubboServiceRegistryImpl ,通过ProxyFactory 包装成Invoker 实例
  4. 调用doLocalExport 方法,使用DubboProtocol 将Invoker 转化为Exporter 实例,并打开Netty 服务端监听客户请求
  5. 创建Registry 实例,连接Zookeeper,并在服务节点下写入提供者的URL 地址,注册服务
  6. 向注册中心订阅override 数据,并返回一个Exporter 实例
  7. 根据URL 格式中的 “dubbo://service-host/{服务名}/{版本号}” 中协议头 dubbo:// 识别,调用DubboProtocol#export() 方法,开发服务端口
  8. RegistryProtocol#export() 返回的Exporter 实例存放到ServiceConfig 的 List<Exporter>exporters

版权声明:本文为whgk原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/whgk/p/14586249.html