.NET Core微服务之路:利用DotNetty实现一个简单的通信过程
-
服务端启动并且向注册中心发送服务信息,注册中心收到后会定时监控服务状态(常见心跳检测);
-
客户端需要开始调用服务的时候,首先去注册中心获取服务信息;
-
客户端创建远程调用连接,连接后服务端返回处理信息;
-
服务发现,向注册中心获取服务(这里需要做的有很多:拿到多个服务时需要做负载均衡,同机房过滤、版本过滤、服务路由过滤、统一网关等);
-
客户端发起调用,将需要调用的服务、方法、参数进行组装;
-
序列化编码组装的消息,这里可以使用json,也可以使用xml,也可以使用protobuf,也可以使用hessian,几种方案的序列化速度还有序列化后占用字节大小都是选择的重要指标,对内笔者建议使用高效的protobuf,它基于TCP/IP二进制进行序列化,体积小,速度快。
-
传输协议,可以使用传统的io阻塞传输,也可以使用高效的nio传输(Netty);
-
服务端收到后进行反序列化,然后进行相应的处理;
-
服务端序列化response信息并且返回;
-
客户端收到response信息并且反序列化;
-
序列化采用二进制消息,性能好/效率高(空间和时间效率都很不错);
-
序列化反序列化直接对应程序中的数据类,不需要解析后在进行映射(XML,JSON都是这种方式);
-
相比http协议,没有无用的header,简化传输数据的大小,且基于TCP层传输,速度更快,容量更小;
-
Netty等一些框架集成(重点,也是本篇介绍的主要框架);
-
使用复杂,维护成本和学习成本较高,调试困难;
-
因为基于HTTP2,绝大部多数HTTP Server、Nginx都尚不支持,即Nginx不能将GRPC请求作为HTTP请求来负载均衡,而是作为普通的TCP请求。(nginx1.9版本已支持);
-
二进制可读性差,或者几乎没有任何直接可读性,需要专门的工具进行反序列化;
-
默认不具备动态特性(可以通过动态定义生成消息类型或者动态编译支持,后续会介绍利用Rosyln进行动态编译的特性);
通信传输利器Netty(Net is DotNetty)介绍
传统通讯的问题:
解决:
(DotNetty的框架和实现是怎么回事,笔者不太清楚,但完全可参考Netty官方的文档来学习和使用DotNetty相关的API接口)
DotNetty中几个重要的库(程序集):
直接上点对点之间通讯的栗子
1 /* 2 * Netty 是一个半成品,作用是在需要基于自定义协议的基础上完成自己的通信封装 3 * Netty 大大简化了网络程序的开发过程比如 TCP 和 UDP 的 socket 服务的开发。 4 * “快速和简单”并不意味着应用程序会有难维护和性能低的问题, 5 * Netty 是一个精心设计的框架,它从许多协议的实现中吸收了很多的经验比如 FTP、SMTP、HTTP、许多二进制和基于文本的传统协议。 6 * 因此,Netty 已经成功地找到一个方式,在不失灵活性的前提下来实现开发的简易性,高性能,稳定性。 7 */ 8 9 namespace Echo.Server 10 { 11 using System; 12 using System.Threading.Tasks; 13 using DotNetty.Codecs; 14 using DotNetty.Handlers.Logging; 15 using DotNetty.Transport.Bootstrapping; 16 using DotNetty.Transport.Channels; 17 using DotNetty.Transport.Libuv; 18 using Examples.Common; 19 20 static class Program 21 { 22 static async Task RunServerAsync() 23 { 24 ExampleHelper.SetConsoleLogger(); 25 26 // 申明一个主回路调度组 27 var dispatcher = new DispatcherEventLoopGroup(); 28 29 /* 30 Netty 提供了许多不同的 EventLoopGroup 的实现用来处理不同的传输。 31 在这个例子中我们实现了一个服务端的应用,因此会有2个 NioEventLoopGroup 会被使用。 32 第一个经常被叫做‘boss’,用来接收进来的连接。第二个经常被叫做‘worker’,用来处理已经被接收的连接,一旦‘boss’接收到连接,就会把连接信息注册到‘worker’上。 33 如何知道多少个线程已经被使用,如何映射到已经创建的 Channel上都需要依赖于 IEventLoopGroup 的实现,并且可以通过构造函数来配置他们的关系。 34 */ 35 36 // 主工作线程组,设置为1个线程 37 IEventLoopGroup bossGroup = dispatcher; // (1) 38 // 子工作线程组,设置为1个线程 39 IEventLoopGroup workerGroup = new WorkerEventLoopGroup(dispatcher); 40 41 try 42 { 43 // 声明一个服务端Bootstrap,每个Netty服务端程序,都由ServerBootstrap控制,通过链式的方式组装需要的参数 44 var serverBootstrap = new ServerBootstrap(); // (2) 45 // 设置主和工作线程组 46 serverBootstrap.Group(bossGroup, workerGroup); 47 48 if (ServerSettings.UseLibuv) 49 { 50 // 申明服务端通信通道为TcpServerChannel 51 serverBootstrap.Channel<TcpServerChannel>(); // (3) 52 } 53 54 serverBootstrap 55 // 设置网络IO参数等 56 .Option(ChannelOption.SoBacklog, 100) // (5) 57 58 // 在主线程组上设置一个打印日志的处理器 59 .Handler(new LoggingHandler("SRV-LSTN")) 60 61 // 设置工作线程参数 62 .ChildHandler( 63 /* 64 * ChannelInitializer 是一个特殊的处理类,他的目的是帮助使用者配置一个新的 Channel。 65 * 也许你想通过增加一些处理类比如DiscardServerHandler 来配置一个新的 Channel 或者其对应的ChannelPipeline 来实现你的网络程序。 66 * 当你的程序变的复杂时,可能你会增加更多的处理类到 pipline 上,然后提取这些匿名类到最顶层的类上。 67 */ 68 new ActionChannelInitializer<IChannel>( // (4) 69 channel => 70 { 71 /* 72 * 工作线程连接器是设置了一个管道,服务端主线程所有接收到的信息都会通过这个管道一层层往下传输, 73 * 同时所有出栈的消息 也要这个管道的所有处理器进行一步步处理。 74 */ 75 IChannelPipeline pipeline = channel.Pipeline; 76 77 // 添加日志拦截器 78 pipeline.AddLast(new LoggingHandler("SRV-CONN")); 79 80 // 添加出栈消息,通过这个handler在消息顶部加上消息的长度。 81 // LengthFieldPrepender(2):使用2个字节来存储数据的长度。 82 pipeline.AddLast("framing-enc", new LengthFieldPrepender(2)); 83 84 /* 85 入栈消息通过该Handler,解析消息的包长信息,并将正确的消息体发送给下一个处理Handler 86 1,InitialBytesToStrip = 0, //读取时需要跳过的字节数 87 2,LengthAdjustment = -5, //包实际长度的纠正,如果包长包括包头和包体,则要减去Length之前的部分 88 3,LengthFieldLength = 4, //长度字段的字节数 整型为4个字节 89 4,LengthFieldOffset = 1, //长度属性的起始(偏移)位 90 5,MaxFrameLength = int.MaxValue, //最大包长 91 */ 92 pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2)); 93 94 // 业务handler 95 pipeline.AddLast("echo", new EchoServerHandler()); 96 })); 97 98 // bootstrap绑定到指定端口的行为就是服务端启动服务,同样的Serverbootstrap可以bind到多个端口 99 IChannel boundChannel = await serverBootstrap.BindAsync(ServerSettings.Port); // (6) 100 101 Console.WriteLine("wait the client input"); 102 Console.ReadLine(); 103 104 // 关闭服务 105 await boundChannel.CloseAsync(); 106 } 107 finally 108 { 109 // 释放指定工作组线程 110 await Task.WhenAll( // (7) 111 bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)), 112 workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)) 113 ); 114 } 115 } 116 117 static void Main() => RunServerAsync().Wait(); 118 } 119 }
View Code
-
IEventLoopGroup 是用来处理I/O操作的多线程事件循环器,DotNetty 提供了许多不同的 EventLoopGroup 的实现用来处理不同的传输。在这个例子中我们实现了一个服务端的应用,因此会有2个 IEventLoopGroup 会被使用。第一个经常被叫做‘boss’,用来接收进来的连接。第二个经常被叫做‘worker’,用来处理已经被接收的连接,一旦‘boss’接收到连接,就会把连接信息注册到‘worker’上。
-
ServerBootstrap 是一个启动 Transport 服务的辅助启动类。你可以在这个服务中直接使用 Channel,但是这会是一个复杂的处理过程,在很多情况下你并不需要这样做。
-
这里我们指定使用 TcpServerChannel类来举例说明一个新的 Channel 如何接收进来的连接。
-
ChannelInitializer 是一个特殊的处理类,他的目的是帮助使用者配置一个新的 Channel,当你的程序变的复杂时,可能你会增加更多的处理类到 pipline 上,然后提取这些匿名类到最顶层的类上。
-
你可以设置这里指定的 Channel 实现的配置参数。我们正在写一个TCP/IP 的服务端,因此我们被允许设置 socket 的参数选项比如tcpNoDelay 和 keepAlive。
-
绑定端口然后启动服务,这里我们在机器上绑定了机器网卡上的设置端口,当然现在你可以多次调用 bind() 方法(基于不同绑定地址)。
-
使用完成后,优雅的释放掉指定的工作组线程,当然,你可以选择关闭程序,但这并不推荐。
Server端的事件处理代码:
上一部分代码中加粗地方的实现
1 namespace Echo.Server 2 { 3 using System; 4 using System.Text; 5 using DotNetty.Buffers; 6 using DotNetty.Transport.Channels; 7 8 /// <summary> 9 /// 服务端处理事件函数 10 /// </summary> 11 public class EchoServerHandler : ChannelHandlerAdapter // ChannelHandlerAdapter 业务继承基类适配器 // (1) 12 { 13 /// <summary> 14 /// 管道开始读 15 /// </summary> 16 /// <param name="context"></param> 17 /// <param name="message"></param> 18 public override void ChannelRead(IChannelHandlerContext context, object message) // (2) 19 { 20 if (message is IByteBuffer buffer) // (3) 21 { 22 Console.WriteLine("Received from client: " + buffer.ToString(Encoding.UTF8)); 23 } 24 25 context.WriteAsync(message); // (4) 26 } 27 28 /// <summary> 29 /// 管道读取完成 30 /// </summary> 31 /// <param name="context"></param> 32 public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); // (5) 33 34 /// <summary> 35 /// 出现异常 36 /// </summary> 37 /// <param name="context"></param> 38 /// <param name="exception"></param> 39 public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) 40 { 41 Console.WriteLine("Exception: " + exception); 42 context.CloseAsync(); 43 } 44 } 45 }
View Code
-
DiscardServerHandler 继承自 ChannelInboundHandlerAdapter,这个类实现了IChannelHandler接口,IChannelHandler提供了许多事件处理的接口方法,然后你可以覆盖这些方法。现在仅仅只需要继承 ChannelInboundHandlerAdapter 类而不是你自己去实现接口方法。
-
这里我们覆盖了 chanelRead() 事件处理方法。每当从客户端收到新的数据时,这个方法会在收到消息时被调用,这个例子中,收到的消息的类型是 ByteBuf。
-
为了响应或显示客户端发来的信息,为此,我们将在控制台中打印出客户端传来的数据。
-
然后,我们将客户端传来的消息通过context.WriteAsync写回到客户端。
-
当然,步骤4只是将流缓存到上下文中,并没执行真正的写入操作,通过执行Flush将流数据写入管道,并通过context传回给传来的客户端。
Client端代码:
重点看注释的地方,其他地方跟Server端没有任何区别
1 namespace Echo.Client 2 { 3 using System; 4 using System.Net; 5 using System.Text; 6 using System.Threading.Tasks; 7 using DotNetty.Buffers; 8 using DotNetty.Codecs; 9 using DotNetty.Handlers.Logging; 10 using DotNetty.Transport.Bootstrapping; 11 using DotNetty.Transport.Channels; 12 using DotNetty.Transport.Channels.Sockets; 13 using Examples.Common; 14 15 static class Program 16 { 17 static async Task RunClientAsync() 18 { 19 ExampleHelper.SetConsoleLogger(); 20 21 var group = new MultithreadEventLoopGroup(); 22 23 try 24 { 25 var bootstrap = new Bootstrap(); 26 bootstrap 27 .Group(group) 28 .Channel<TcpSocketChannel>() 29 .Option(ChannelOption.TcpNodelay, true) 30 .Handler( 31 new ActionChannelInitializer<ISocketChannel>( 32 channel => 33 { 34 IChannelPipeline pipeline = channel.Pipeline; 35 pipeline.AddLast(new LoggingHandler()); 36 pipeline.AddLast("framing-enc", new LengthFieldPrepender(2)); 37 pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2)); 38 39 pipeline.AddLast("echo", new EchoClientHandler()); 40 })); 41 42 IChannel clientChannel = await bootstrap.ConnectAsync(new IPEndPoint(ClientSettings.Host, ClientSettings.Port)); 43 44 // 建立死循环,类同于While(true) 45 for (;;) // (4) 46 { 47 Console.WriteLine("input you data:"); 48 // 根据设置建立缓存区大小 49 IByteBuffer initialMessage = Unpooled.Buffer(ClientSettings.Size); // (1) 50 string r = Console.ReadLine(); 51 // 将数据流写入缓冲区 52 initialMessage.WriteBytes(Encoding.UTF8.GetBytes(r ?? throw new InvalidOperationException())); // (2) 53 // 将缓冲区数据流写入到管道中 54 await clientChannel.WriteAndFlushAsync(initialMessage); // (3) 55 if(r.Contains("bye")) 56 break; 57 } 58 59 Console.WriteLine("byebye"); 60 61 62 await clientChannel.CloseAsync(); 63 } 64 finally 65 { 66 await group.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)); 67 } 68 } 69 70 static void Main() => RunClientAsync().Wait(); 71 } 72 }
View Code
-
初始化一个缓冲区的大小。
-
默认缓冲区接受的数据类型为bytes[],当然这样也更加便于序列化成流。
-
将缓冲区的流直接数据写入到Channel管道中。该管道一般为链接通讯的另一端(C端)。
-
建立死循环,这样做的目的是为了测试每次都必须从客户端输入的数据,通过服务端回路一次后,再进行下一次的输入操作。
Client端的事件处理代码:
1 namespace Echo.Client 2 { 3 using System; 4 using System.Text; 5 using DotNetty.Buffers; 6 using DotNetty.Transport.Channels; 7 8 public class EchoClientHandler : ChannelHandlerAdapter 9 { 10 readonly IByteBuffer initialMessage; 11 12 public override void ChannelActive(IChannelHandlerContext context) => context.WriteAndFlushAsync(this.initialMessage); 13 14 public override void ChannelRead(IChannelHandlerContext context, object message) 15 { 16 if (message is IByteBuffer byteBuffer) 17 { 18 Console.WriteLine("Received from server: " + byteBuffer.ToString(Encoding.UTF8)); 19 } 20 } 21 22 public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); 23 24 public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) 25 { 26 Console.WriteLine("Exception: " + exception); 27 context.CloseAsync(); 28 } 29 } 30 }
View Code
实现结果
DotNetty内部调试记录分析
虽然DotNetty官方没有提供任何技术文档,但官方却提供了详细的调试记录,很多时候,我们学习者其实也可以通过调试记录来分析某一个功能的实现流程。我们可以通过将DotNetty的内部输入输出记录打印到控制台上。
InternalLoggerFactory.DefaultFactory.AddProvider(new ConsoleLoggerProvider((s, level) => true, false));
可以看到服务端的打印记录一下多出来了许多许多,有大部分是属于DotNetty内部调试时的打印记录,我们只着重看如下的部分。
dbug: SRV-LSTN[0] [id: 0x3e8afca1] HANDLER_ADDED dbug: SRV-LSTN[0] [id: 0x3e8afca1] REGISTERED (1) dbug: SRV-LSTN[0] [id: 0x3e8afca1] BIND: 0.0.0.0:8007 (2) wait the client input dbug: SRV-LSTN[0] [id: 0x3e8afca1, 0.0.0.0:8007] ACTIVE (3) dbug: SRV-LSTN[0] [id: 0x3e8afca1, 0.0.0.0:8007] READ (4) dbug: SRV-LSTN[0] [id: 0x3e8afca1, 0.0.0.0:8007] RECEIVED: [id: 0x7bac2775, 127.0.0.1:64073 :> 127.0.0.1:8007] (5) dbug: SRV-LSTN[0] [id: 0x3e8afca1, 0.0.0.0:8007] RECEIVED_COMPLETE (6) dbug: SRV-LSTN[0] [id: 0x3e8afca1, 0.0.0.0:8007] READ (7) dbug: SRV-CONN[0] [id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] HANDLER_ADDED (8) dbug: SRV-CONN[0] [id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] REGISTERED (9) dbug: SRV-CONN[0] [id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] ACTIVE (10) dbug: SRV-CONN[0] [id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] READ (11) dbug: DotNetty.Buffers.AbstractByteBuffer[0] (12) -Dio.netty.buffer.bytebuf.checkAccessible: True dbug: SRV-CONN[0] [id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] RECEIVED: 14B (13) +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |100000000| 00 0C 68 65 6C 6C 6F 20 77 6F 72 6C 64 21 |..hello world! | +--------+-------------------------------------------------+----------------+ Received from client: hello world! dbug: SRV-CONN[0] (14) [id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] WRITE: 2B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |100000000| 00 0C |.. | +--------+-------------------------------------------------+----------------+ dbug: SRV-CONN[0] (15) [id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] WRITE: 12B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |100000000| 68 65 6C 6C 6F 20 77 6F 72 6C 64 21 |hello world! | +--------+-------------------------------------------------+----------------+ dbug: SRV-CONN[0] (16) [id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] RECEIVED_COMPLETE dbug: SRV-CONN[0] (17) [id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] FLUSH dbug: SRV-CONN[0] (18) [id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] READ
咋一看,有18个操作,好像有点太多了,其实不然,还有很多很多的内部调试细节并没打印到控制台上。
-
通过手动建立的工作线程组,并将这组线程注册到管道中,这个管道可以是基于SOCKER,可以基于IChannel(1);
-
绑定自定的IP地址和端口号到自定义管道上(2);
-
激活自定义管道(3);
-
开始读取(其实也是开始监听)(4);
-
收到来自id为0x7bac2775的客户端连接请求,建立连接,并继续开始监听(5)(6)(7);
-
从第8步开始,日志已经变成id为0x7bac2775的记录了,当然一样包含注册管道,激活管道,开始监听等等与S端一模一样的操作(8)(9)(10)(11)
-
当笔者输入一条”hello world!”数据后,DotNetty.Buffers.AbstractByteBuffer会进行数据类型检查,以便确认能将数据放入到管道中。(12)
-
将数据发送到S端,数据大小为14B,hello world前有两个点,代表这是数据头,紧接着再发送两个点,但没有任何数据,代表数据已经结束。DotNetty将数据的十六进制存储位用易懂的方式表现了出来,很人性化。(13)(14)
-
S端收到数据没有任何加工和处理,马上将数据回传到C端。(15)(16)
-
最后,当这个过程完成后,需要将缓存区的数据强制写入到管道中,所以会执行一次Flush操作,整个传输完成。接下来,不管是C端还是S端,继续将自己的状态改成READ,用于监听管道中的各种情况,比如连接状态,数据传输等等(17)。
总结