【netty这点事儿】ByteBuf 的使用模式
最常用的 ByteBuf 模式是将数据存储在 JVM 的堆空间中。 这种模式被称为支撑数组
(backing array), 它能在没有使用池化的情况下提供快速的分配和释放。
让我们考虑一下一个由两部分——头部和主体——组成的将通过 HTTP 协议传输的消息。这两部分由应用程序的不同模块产生, 将会在消息被发送的时候组装。该应用程序可以选择为多个消息重用相同的消息主体。当这种情况发生时,对于每个消息都将会创建一个新的头部。
因为我们不想为每个消息都重新分配这两个缓冲区,所以使用 CompositeByteBuf 是一个
需要注意的是, Netty使用了CompositeByteBuf来优化套接字的I/O操作,尽可能地消除了
ByteBuf 的分配方式
池化的分配 PooledByteBufAllocator是ByteBufAllocator的默认方式
可以通过 Channel(每个都可以有一个不同的 ByteBufAllocator 实例)或者绑定到
ChannelHandler 的 ChannelHandlerContext 获取一个到 ByteBufAllocator 的引用。
非池化的分配 UnpooledByteBufAllocator
可能某些情况下,你未能获取一个到 ByteBufAllocator 的引用。对于这种情况,Netty 提供了一个简单的称为 Unpooled 的工具类, 它提供了静态的辅助方法来创建未池化的 ByteBuf实例。
netty http 代码
package http.server; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; public class HttpServer { private static Log log = LogFactory.getLog(HttpServer.class); public void start(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { // server端发送的是httpResponse,所以要使用HttpResponseEncoder进行编码 ch.pipeline().addLast(new HttpResponseEncoder()); // server端接收到的是httpRequest,所以要使用HttpRequestDecoder进行解码 ch.pipeline().addLast(new HttpRequestDecoder()); ch.pipeline().addLast(new HttpServerInboundHandler()); } }).option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { HttpServer server = new HttpServer(); log.info("Http Server listening on 5656 ..."); server.start(5656); } }
package http.server; import static io.netty.handler.codec.http.HttpResponseStatus.OK; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpVersion; public class HttpServerInboundHandler extends ChannelInboundHandlerAdapter { private static Log log = LogFactory.getLog(HttpServerInboundHandler.class); // private HttpRequest request; // static ByteBuf buf = Unpooled.wrappedBuffer("hello world".getBytes()); byte[] bs = "hello world".getBytes(); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // if (msg instanceof HttpRequest) { // request = (HttpRequest) msg; // // String uri = request.uri(); // // System.out.println("Uri:" + uri); // } // if (msg instanceof HttpContent) { // HttpContent content = (HttpContent) msg; // ByteBuf buf = content.content(); // // System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8)); // buf.release(); // String res = "hello world."; // ByteBuf buf = Unpooled.wrappedBuffer(bs); // ByteBuf buf = Unpooled.directBuffer(); // ByteBuf buf = Unpooled.buffer(); // ByteBuf buf = ctx.alloc().heapBuffer();// 池化堆内存 // ByteBuf buf = ctx.alloc().directBuffer(); // 池化直接内存 // ByteBuf buf = Unpooled.buffer();// 非池化堆内存 ByteBuf buf = Unpooled.directBuffer();// 非池化堆内存 buf.writeBytes(bs); FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, OK, buf); // response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain"); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); /* * if (HttpHeaders.isKeepAlive(request)) { response.headers().set(CONNECTION, Values.KEEP_ALIVE); } */ ctx.write(response); // ctx.flush(); // } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error(cause.getMessage()); ctx.close(); } }
池化堆内存 ctx.alloc().heapBuffer()
Running 20s test @
4 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 3.76ms 9.31ms 180.34ms 92.96%
Req/Sec 138.20k 43.16k 210.22k 66.50%
10957832 requests in 20.09s, 522.51MB read
Requests/sec: 545332.08
Transfer/sec: 26.00MB
real 0m20.104s
user 0m10.441s
sys 0m44.703s
池化直接内存 ctx.alloc().directBuffer()
Running 20s test @
4 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 4.47ms 9.99ms 149.70ms 91.76%
Req/Sec 138.51k 41.31k 209.94k 63.38%
10981466 requests in 20.09s, 523.64MB read
Requests/sec: 546684.37
Transfer/sec: 26.07MB
real 0m20.098s
user 0m10.890s
sys 0m45.081s
非池化堆内存 Unpooled.buffer()
Running 20s test @
4 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 4.00ms 8.72ms 150.05ms 91.52%
Req/Sec 138.84k 42.05k 209.72k 63.81%
11017442 requests in 20.09s, 525.35MB read
Requests/sec: 548379.99
Transfer/sec: 26.15MB
real 0m20.101s
user 0m10.639s
sys 0m45.191s
非池化直接内存 Unpooled.directBuffer()
Running 20s test @
4 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 3.64ms 9.36ms 156.79ms 92.71%
Req/Sec 124.55k 33.90k 191.90k 71.61%
9890536 requests in 20.07s, 471.62MB read
Requests/sec: 492854.62
Transfer/sec: 23.50MB
real 0m20.076s
user 0m9.774s
sys 0m41.801s