BeetleX之TCP服务应用详解
BeetleX
是.net core
平台下的一个开源TCP
通讯组件,它不仅使用简便还提供了出色性能的支持,可以轻易让你实现上百万级别RPS吞吐的服务应用。组件所提供的基础功能也非常完善,可以让你轻易扩展自己的服务应用,以下提组件集成的功能:
-
完善的会话管理机制,可以根据连接状态和相关日志
-
专门针对内存池实现的异步流读写,支持标准Stream的同并提供高效的性能
-
消息IO合并,广播序列化合并等性能强化功能
-
提供简洁的协议扩展规范,轻易实现http,websocket,mqtt等应用通讯协议
-
支持TLS,让你构建的通讯服务更安全可靠
扩展的组件
以下是Beetlex
扩展的一些功能组件
- https://github.com/IKende/FastHttpApi
- https://github.com/IKende/Bumblebee
- https://github.com/IKende/BeetleX.Redis
- https://github.com/IKende/XRPC
- https://github.com/IKende/HttpClients
性能
一开始说组可以让你现上百万级别RPS吞吐的服务应用其实一点不假,BeetleX
的基础性能有这样的支撑能力;虽然组件不能说是.net core
上性能最好的,但在功能和综合性能上绝对非常出色(详细可以https://tfb-status.techempower.com/ 查看测试结果,可惜这个网站提交的.net core
组件比较少,大部都是基于aspcore的通讯模块扩展).以下是JSON serialization
基础输出的一个测试结果(Plaintext
在官方的测试环境一直没办法跑起来….)
在测试中组件只落后于aspcore-rhtx
这是红帽专门针对 .net core编写的linux网络驱动.
Single query
构建基础TCP应用
组件在构建TCP服务的时候非常简单,主要归功于它提供了完善的Stream
读写功能,而这些功能让你完全不用关心bytes的读写。基于Stream
的好处就是可以轻松和第三方序列化的组件进行整合。以下是简单地构建一个Hello
服务。
class Program : ServerHandlerBase { private static IServer server; public static void Main(string[] args) { server = SocketFactory.CreateTcpServer<Program>(); //server.Options.DefaultListen.Port =9090; //server.Options.DefaultListen.Host = "127.0.0.1"; server.Open(); Console.Read(); } public override void SessionReceive(IServer server, SessionReceiveEventArgs e) { var pipeStream = e.Stream.ToPipeStream(); if (pipeStream.TryReadLine(out string name)) { Console.WriteLine(name); pipeStream.WriteLine("hello " + name); e.Session.Stream.Flush(); } base.SessionReceive(server, e); } }
以上就是一个简单的TCP
服务,让以代码正常运行需要引用Beetlex
最新版的组件可以在Nuget上找到。以上服务的功能很简单当接收数据后尝试从流中读取一行字符,如果读取成功则把内容写入到流中提交返回。通过以上代码是不是感觉写个服务比较简单(但是PipeStream
并不是线程安全的,所以不能涉及到多线程读写它)
协议处理规则
其实PipeStream
处理数据已经非常方便了,那为什么还需要制定一个协议处理规范呢?前面已经说了PipeStream
并不是线程安全的,很容易带来使用上的风险,所以引入协议处理规则来进行一个安全约束的同时可以实现多线程消息处理。组件提供了这样一个接口来规范消息的处理,接口如下:
public interface IPacket : IDisposable { EventHandler<EventArgs.PacketDecodeCompletedEventArgs> Completed { get; set; } IPacket Clone(); void Decode(ISession session, System.IO.Stream stream); void Encode(object data, ISession session, System.IO.Stream stream); byte[] Encode(object data, IServer server); ArraySegment<byte> Encode(object data, IServer server, byte[] buffer); }
如果你要处理消息对象,则需要实现以上接口(当然这个接口的实现不是必须的,只要把握好PipeStream
安全上的控制就好);但实现这接口来处理消息可以带很多好处,可以多消息合并IO,广播消息合并序列化等高效的功能。不过在不了解组件的情况实现这个接口的确也是有些难度的,所以组件提供了一个基础的类FixedHeaderPacket
,它是一个抽像类用于描述有个消息头长的信息流处理。
字符消息分包
接下来通过FixedHeaderPacket
来实现一个简单的字符分包协议消息;主要在发送消息的时候添加一个大小头用来描述消息的长度(这是在TCP中解决粘包的主要手段)。
public class StringPacket : BeetleX.Packets.FixedHeaderPacket { public override IPacket Clone() { return new StringPacket(); } protected override object OnReader(ISession session, PipeStream stream) { return stream.ReadString(CurrentSize); } protected override void OnWrite(ISession session, object data, PipeStream stream) { stream.Write((string)data); } }
通过FixedHeaderPacket
制定一个分包规则是非常简单的,主要实现读写两个方法。下面即可在服务中引用这个包作为TCP
数据流的分析规则:
class Program : ServerHandlerBase { private static IServer server; public static void Main(string[] args) { server = SocketFactory.CreateTcpServer<Program,StringPacket>(); //server.Options.DefaultListen.Port =9090; //server.Options.DefaultListen.Host = "127.0.0.1"; server.Open(); Console.Read(); } protected override void OnReceiveMessage(IServer server, ISession session, object message) { Console.WriteLine(message); server.Send($"hello {message}", session); } }
经过分析器包装后,就再也不用流来处理数据了,可以直接进行对像的发送处理。
集成Protobuf
处理String
并不是友好的事情,毕竟没有对象来得直观和操作方便;以下是通过FixedHeaderPacket
扩展Protobuf
对象传输,以下是针对Protobuf
的规则扩展:
public class ProtobufPacket : BeetleX.Packets.FixedHeaderPacket { static ProtobufPacket() { TypeHeader.Register(typeof(ProtobufClientPacket).Assembly); } public static BeetleX.Packets.CustomTypeHeader TypeHeader { get; set; } = new BeetleX.Packets.CustomTypeHeader(BeetleX.Packets.MessageIDType.INT); public override IPacket Clone() { return new ProtobufPacket(); } protected override object OnReader(ISession session, PipeStream stream) { Type type = TypeHeader.ReadType(stream); var size = CurrentSize - 4; return ProtoBuf.Meta.RuntimeTypeModel.Default.Deserialize(stream, null, type, size); } protected override void OnWrite(ISession session, object data, PipeStream stream) { TypeHeader.WriteType(data, stream); ProtoBuf.Meta.RuntimeTypeModel.Default.Serialize(stream, data); } }
使用规则分析器
class Program : ServerHandlerBase { private static IServer server; public static void Main(string[] args) { server = SocketFactory.CreateTcpServer<Program, Messages.ProtobufPacket>(); //server.Options.DefaultListen.Port =9090; //server.Options.DefaultListen.Host = "127.0.0.1"; server.Open(); Console.Read(); } protected override void OnReceiveMessage(IServer server, ISession session, object message) { ((Messages.Register)message).DateTime = DateTime.Now; server.Send(message, session); } }
不同序列化的扩展
既然有了一个Protobuf
作为样本,那针对其他序列化的实现就比较简单了
- json
public class JsonPacket : BeetleX.Packets.FixedHeaderPacket { static JsonPacket() { TypeHeader.Register(typeof(JsonClientPacket).Assembly); } public static BeetleX.Packets.CustomTypeHeader TypeHeader { get; set; } = new BeetleX.Packets.CustomTypeHeader(BeetleX.Packets.MessageIDType.INT); public override IPacket Clone() { return new JsonPacket(); } protected override object OnReader(ISession session, PipeStream stream) { Type type = TypeHeader.ReadType(stream); var size = CurrentSize - 4; var buffer = System.Buffers.ArrayPool<byte>.Shared.Rent(size); stream.Read(buffer, 0, size); try { return SpanJson.JsonSerializer.NonGeneric.Utf8.Deserialize(new ReadOnlySpan<byte>(buffer, 0, size), type); } finally { System.Buffers.ArrayPool<byte>.Shared.Return(buffer); } } protected override void OnWrite(ISession session, object data, PipeStream stream) { TypeHeader.WriteType(data, stream); var buffer = SpanJson.JsonSerializer.NonGeneric.Utf8.SerializeToArrayPool(data); try { stream.Write(buffer.Array, buffer.Offset, buffer.Count); } finally { System.Buffers.ArrayPool<byte>.Shared.Return(buffer.Array); } } }
- messagepack
public class MsgpackPacket : BeetleX.Packets.FixedHeaderPacket { static MsgpackPacket() { TypeHeader.Register(typeof(MsgpackClientPacket).Assembly); } public static BeetleX.Packets.CustomTypeHeader TypeHeader { get; set; } = new BeetleX.Packets.CustomTypeHeader(BeetleX.Packets.MessageIDType.INT); public override IPacket Clone() { return new MsgpackPacket(); } protected override object OnReader(ISession session, PipeStream stream) { Type type = TypeHeader.ReadType(stream); var size = CurrentSize - 4; return MessagePackSerializer.NonGeneric.Deserialize(type, stream, true); } protected override void OnWrite(ISession session, object data, PipeStream stream) { TypeHeader.WriteType(data, stream); MessagePackSerializer.NonGeneric.Serialize(data.GetType(), stream, data); } }