【Netty】5 源码 Bootstrap

上一篇讲了AbstractBootstrap,为这篇做了个铺垫。

一、概述

Bootstrap 是 Netty 提供的一个便利的工厂类, 我们可以通过它来完成 Netty 的客户端或服务器端的 Netty 初始化.
Bootstrap: 用于客户端,只需要一个单独的Channel,来与服务端进行数据交互,对应server端的子Channel。
作用职责:EventLoop初始化,channel的注册过程 ,关于pipeline的初始化,handler的添加过程,客户端连接分析。

Netty客户端源码部分

  EventLoopGroup group = new NioEventLoopGroup();
         try {
             Bootstrap b = new Bootstrap();
             b.group(group) // 注册线程池
              .channel(NioSocketChannel.class) // 使用NioSocketChannel来作为连接用的channel类
              .handler(new ChannelInitializer<SocketChannel>() { // 绑定连接初始化器
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                                     //这里放入自定义助手类
                                     ch.pipeline().addLast(new EchoClientHandler());
                                 }
                             });
         
             ChannelFuture cf = b.connect(host, port).sync(); // 异步连接服务器
             cf.channel().closeFuture().sync(); // 异步等待关闭连接channel
 
         } finally {
             group.shutdownGracefully().sync(); // 释放线程池资源
         }
     }

从上面的客户端代码虽然简单, 但是却展示了 Netty 客户端初始化时所需的所有内容:

1. EventLoopGroup: 不论是服务器端还是客户端, 都必须指定 EventLoopGroup. 在这个例子中, 
   指定了 NioEventLoopGroup, 表示一个 NIO   的EventLoopGroup.
2. ChannelType: 指定 Channel 的类型. 因为是客户端, 因此使用了 NioSocketChannel.
3. Handler: 设置数据的处理器.
4. 这里的option,提供了一系列的TCP参数

下面我们深入代码, 看一下客户端通过 Bootstrap 启动后, 都做了哪些工作.

二、源码分析

1、group(group)

 /**
  * 直接调用父类AbstractBootstrap的方法
  */
public B group(EventLoopGroup group) {
    if (group == null) {
        throw new NullPointerException("group");
    }
    if (this.group != null) {
        throw new IllegalStateException("group set already");
    }
    this.group = group;
    return self();
}

直接调用父类的方法 ,说明该EventLoopGroup,作为客户端 Connector 线程,负责注册监听连接操作位,用于判断异步连接结果。

2、channel(NioServerSocketChannel.class)

在 Netty 中, Channel是一个Socket的抽象, 它为用户提供了关于 Socket 状态(是否是连接还是断开) 以及对 Socket 的读写等操作. 每当 Netty 建立了一个连接后, 都会有一个对应的 Channel 实例。

2.1源码

/**
 * 同样也是直接调用父类AbstractBootstrap的方法
 */
    public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

我们再来看下ReflectiveChannelFactory类

    public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

        private final Class<? extends T> clazz;

        /**
         * 通过构造函数 传入 clazz
         */
        public ReflectiveChannelFactory(Class<? extends T> clazz) {
            if (clazz == null) {
                throw new NullPointerException("clazz");
            }
            this.clazz = clazz;
        }

        /**
         * 只用这一个方法 通过传入不同的Channel.class 创建不同的Channel 对象。
         * newChannel() 什么时候调用呢 仔细追源码 发现是在绑定 IP 和 端口的 doResolveAndConnect方法里会调用
         */
        @Override
        public T newChannel() {
            try {
                return clazz.getConstructor().newInstance();
            } catch (Throwable t) {
                throw new ChannelException("Unable to create Channel from class " + clazz, t);
            }
        }

在看channelFactory(new ReflectiveChannelFactory(channelClass)) 方法

   /**
     * 创建好Channel后,返回对象Bootstrap本身
     */
    @Deprecated
    public B channelFactory(ChannelFactory<? extends C> channelFactory) {
        if (channelFactory == null) {
            throw new NullPointerException("channelFactory");
        }
        if (this.channelFactory != null) {
            throw new IllegalStateException("channelFactory set already");
        }
        this.channelFactory = channelFactory;
        return self();
    }

因此对于我们这个例子中的客户端的 Bootstrap 而言, 生成的的 Channel 实例就是 NioSocketChannel。

2.2 Channel 类型

除了 TCP 协议以外, Netty 还支持很多其他的连接协议, 并且每种协议还有 NIO(异步 IO) 和 OIO(Old-IO, 即传统的阻塞 IO) 版本的区别. 不同协议不同的阻塞类型的连接都有不同的 Channel 类型与之对应下面是一些常用的 Channel 类型:

- NioSocketChannel, 代表异步的客户端 TCP Socket 连接.
- NioServerSocketChannel, 异步的服务器端 TCP Socket 连接.
- NioDatagramChannel, 异步的 UDP 连接
- NioSctpChannel, 异步的客户端 Sctp 连接.
- NioSctpServerChannel, 异步的 Sctp 服务器端连接.
- OioSocketChannel, 同步的客户端 TCP Socket 连接.
- OioServerSocketChannel, 同步的服务器端 TCP Socket 连接.
- OioDatagramChannel, 同步的 UDP 连接
- OioSctpChannel, 同步的 Sctp 服务器端连接.
- OioSctpServerChannel, 同步的客户端 TCP Socket 连接.

3、handler(ChannelHandler handler)

Netty 的一个强大和灵活之处就是基于 Pipeline 的自定义 handler 机制. 基于此, 我们可以像添加插件一样自由组合各种各样的 handler 来完成业务逻辑. 例如我们需要处理 HTTP 数据, 那么就可以在 pipeline 前添加一个 Http 的编解码的 Handler, 然后接着添加我们自己的业务逻辑的 handler, 这样网络上的数据流就向通过一个管道一样, 从不同的 handler 中流过并进行编解码, 最终在到达我们自定义的 handler 中。

/**
 * 同样也是 直接调用父类 AbstractBootstrap 的方法
 */
public B handler(ChannelHandler handler) {
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    this.handler = handler;
    return self();
}

不过我们看到代码 一般都是这样写的

.handler(new ChannelInitializer<SocketChannel>() {
         @Override
         public void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline p = ch.pipeline();
             p.addLast(new EchoClientHandler());
        }
     })

那是因为Bootstrap.handler 方法接收一个 ChannelHandler, 而我们传递的是一个 派生于 ChannelInitializer 的匿名类, 它正好也实现了 ChannelHandler 接口. 我们来看一下, ChannelInitializer 类部分代码:

    /**
     * ChannelInboundHandlerAdapter 父类的父类 最终会继承 ChannelHandler 
     * 那么ChannelInitializer 也就是 ChannelHandler的 子类
     */
    public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

        private static final InternalLogger logger
            =InternalLoggerFactory.getInstance(ChannelInitializer.class);

        /**
         * 这里只有这一个抽象类 所以我们只需重写这一个方法就可以了
         */
        protected abstract void initChannel(C ch) throws Exception;

        @Override
        @SuppressWarnings("unchecked")
        public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            initChannel((C) ctx.channel());
            ctx.pipeline().remove(this);
            ctx.fireChannelRegistered();
        }
   
    }

ChannelInitializer 是一个抽象类, 它有一个抽象的方法 initChannel, 我们正是实现了这个方法, 并添加的自定义的 handler 的. 那么 initChannel 是哪里被调用的呢?
答案是 ChannelInitializer.channelRegistered 方法中。
我们来关注一下 channelRegistered 方法. 从上面的源码中, 我们可以看到, 在 channelRegistered 方法中, 会调用 initChannel 方法, 将自定义的 handler 添加到 ChannelPipeline 中, 然后调用 ctx.pipeline().remove(this) 将自己从 ChannelPipeline 中删除. 上面的分析过程, 可以用如下图片展示:
一开始, ChannelPipeline 中只有三个 handler, head, tail 和我们添加的 ChannelInitializer.

接着 initChannel 方法调用后, 添加了自定义的 handler

最后将 ChannelInitializer 删除

### 4、ChannelPipeline对象

   /**
     * 我们在initChannel抽象方法的实现方法中 通过 SocketChannel获得 ChannelPipeline对象
     */
    ChannelPipeline p = ch.pipeline();
    p.addLast(newEchoClientHandler());

在实例化一个 Channel 时, 会伴随着一个 ChannelPipeline 的实例化, 并且此 Channel 会与这个 ChannelPipeline 相互关联, 这一点可以通过NioSocketChannel 的父类 AbstractChannel 的构造器:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    //这个可以看出
    pipeline = new DefaultChannelPipeline(this);
}

当实例化一个 Channel(这里以 EchoClient 为例, 那么 Channel 就是 NioSocketChannel), 其 pipeline 字段就是我们新创建的 DefaultChannelPipeline 对象, 那么我们就来看一下 DefaultChannelPipeline 的构造方法。

public DefaultChannelPipeline(AbstractChannel channel) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    this.channel = channel;

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

我们调用 DefaultChannelPipeline 的构造器, 传入了一个 channel, 而这个 channel 其实就是我们实例化的 NioSocketChannel, DefaultChannelPipeline 会将这个 NioSocketChannel 对象保存在channel 字段中。DefaultChannelPipeline 中, 还有两个特殊的字段, 即 head tail, 而这两个字段是一个双向链表的头和尾. 其实在 DefaultChannelPipeline 中, 维护了一个以 AbstractChannelHandlerContext 为节点的双向链表, 这个链表是 Netty 实现 Pipeline 机制的关键。

5、.connect(host, port)

经过上面的各种分析后, 我们大致了解了 Netty 初始化时, 所做的工作, 接下来 分析一下客户端是如何发起 TCP 连接的。

   /**
     * 1、 这里 终于是Bootstrap 自己的方法了。 传入IP 地址 和 端口号
     */
    public ChannelFuture connect(String inetHost, int inetPort) {
        //通过InetSocketAddress 构造函数 绑定 IP地址+端口号
        return connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
    }

   /**
     * 2、上面调用该方法 ,该方法在调用 doResolveAndConnect方法
     */
    public ChannelFuture connect(SocketAddress remoteAddress) {
        if (remoteAddress == null) {
            throw new NullPointerException("remoteAddress");
        }
        validate();
        return doResolveAndConnect(remoteAddress, config.localAddress());
    }

   /**
     * 3、这步 实例化 Channer
     */
    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
      
        //注意 这里 initAndRegister()方法就是实例化 Channer 的方法 上面说过 真正获取Channer 对象 是在这步获取的
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();

           // 这里省略的 很大一部分逻辑判断的代码
            return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());   
    }

    /**
     * 3.1 这里 就开始 调 newChannel() 方法 也就创建了 Channel 对象
     */
    final ChannelFuture initAndRegister() {
        Channel channel = null;
                channel = channelFactory.newChannel();
        return regFuture;
    }

    /**
     * 4、在看doResolveAndConnect0方法
     *    这一步还是对一些 参数数据 进行校验  省略了校验代码
     */
    private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
                                               final SocketAddress localAddress, final ChannelPromise promise) {
           // 获取 当前 EventLoop线程
            final EventLoop eventLoop = channel.eventLoop();
            final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
            final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
                   //这一步 才是 连接的关键
                    doConnect(resolveFuture.getNow(), localAddress, promise);
           
        return promise;
    }

接下来看重要的方法,在 connect 中, 会进行一些参数检查后, 最终调用的是 doConnect 方法,有关doConnect之后接下来源码,等自己对Netty了解更细致之后 ,再来写吧。

这里推荐一个博主,有关Netty源码分析的蛮好的:源码之下无秘密 ── 做最好的 Netty 源码分析教程

“`
如果一个人充满快乐,正面的思想,那么好的人事物就会和他共鸣,而且被他吸引过来。同样,一个人老带悲伤,倒霉的事情也会跟过来。

                                              ——在自己心情低落的时候,告诫自己不要把负能量带给别人。(大校13)

版权声明:本文为qdhxhz原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/qdhxhz/p/10105887.html