netty的简单的应用例子
一、简单的聊天室程序
public class ChatClient { public static void main(String[] args) throws InterruptedException, IOException { NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(); try{ Bootstrap bootstrap = new Bootstrap(); bootstrap.group(nioEventLoopGroup).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() // .addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter())) .addLast(new LineBasedFrameDecoder(1024)) .addLast(new StringDecoder(CharsetUtil.UTF_8)) .addLast(new StringEncoder(CharsetUtil.UTF_8)) .addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg); } }); } }); ChannelFuture future = bootstrap.connect("localhost", 8888).sync(); BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); Channel channel = future.channel(); while(true){ channel.writeAndFlush(br.readLine()+"\n"); } }finally{ nioEventLoopGroup.shutdownGracefully().sync(); } } }
public class ChatServer { public static void main(String[] args) throws InterruptedException { NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() // .addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter())) .addLast(new LineBasedFrameDecoder(1024)) .addLast(new StringDecoder(CharsetUtil.UTF_8)) .addLast(new StringEncoder(CharsetUtil.UTF_8)) .addLast(new ChatServerHandler()); } }); ChannelFuture future = serverBootstrap.bind(8888).sync(); future.channel().closeFuture().sync(); }finally{ bossGroup.shutdownGracefully().sync(); workerGroup.shutdownGracefully().sync(); } } }
public class ChatServerHandler extends SimpleChannelInboundHandler<String>{ public static final ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { Channel channel = ctx.channel(); group.forEach(ch ->{ if(channel != ch){ ch.writeAndFlush(channel.remoteAddress()+": "+ msg+"\n"); }else{ ch.writeAndFlush("自己: "+msg+"\n"); } }); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); group.writeAndFlush("服务器: "+channel.remoteAddress()+"加入"); group.add(channel); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); group.writeAndFlush("服务器 : "+channel.remoteAddress() + "离开"); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress()+" 上线"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress()+" 下线"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
二、使用netty传递对象,使用jdk自带的序列化
public class SubReqClient { public static void main(String[] args) { start(); } public static void start(){ NioEventLoopGroup group = new NioEventLoopGroup(); try{ Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() .addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled( this.getClass().getClassLoader() ))) .addLast("encoder", new ObjectEncoder()) .addLast(new SimpleChannelInboundHandler<Object>() { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(msg); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { User user = new User(); user.setName("runtu"); ctx.writeAndFlush(user); } }); } }); ChannelFuture future = bootstrap.connect("localhost", 8888).sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally{ group.shutdownGracefully(); } } }
public class SubReqServer { public static void main(String[] args) { start(); } public static void start(){ NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() .addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled( this.getClass().getClassLoader() ))) .addLast("encoder", new ObjectEncoder()) .addLast(new SimpleChannelInboundHandler<Object>() { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { User user1 = (User)msg; System.out.println(user1.getName()); User user = new User(); user.setName("AQ"); ctx.writeAndFlush(user); } }); } }); ChannelFuture future = serverBootstrap.bind(8888).sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally{ bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
public class User implements Serializable{ private String name; private int age; private String address; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } @Override public String toString() { return "User [name=" + name + ", age=" + age + ", address=" + address + "]"; } }
三、使用messagepack进行序列化传递实体对象
由于java自带的序列化工具有着性能低、序列化后的码流大且不支持跨语言等各种缺陷,所以我们使用Msgpack库来进行序列化然后进行数据的传输。
我们需要额外引入两个jar包javassist-3.18.1-GA.jar 和msgpack-0.6.12.jar
public class NioClient { public static void main(String[] args) throws InterruptedException { NioEventLoopGroup group = new NioEventLoopGroup(); try{ Bootstrap bootStrap = new Bootstrap(); bootStrap.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,3000) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() .addLast("frameDecoder",new LengthFieldBasedFrameDecoder(1024, 0, 2,0,2)) .addLast(new MsgpackDecoder()) .addLast("frameEncoder",new LengthFieldPrepender(2)) .addLast(new MsgpackEncoder()) .addLast(new ClientHandler()); } }); ChannelFuture future = bootStrap.connect("localhost", 9999).sync(); future.channel().closeFuture().sync(); }finally{ group.shutdownGracefully(); } } }
public class ClientHandler extends SimpleChannelInboundHandler { @Override public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(msg); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //发送50个UserInfo给服务器,由于启用了粘包/拆包支持,所以这里连续发送多个也不会出现粘包的现象。 for (int i = 0; i < 50; i++) { User user = new User(); user.setName("张三"+i); ctx.write(user); } ctx.flush(); System.out.println("-----------------send over-----------------"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("error"); } }
public class NioServer { public static void main(String[] args) throws InterruptedException { NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() .addLast("frameDecoder",new LengthFieldBasedFrameDecoder(1024, 0, 2,0,2)) .addLast(new MsgpackDecoder()) .addLast("frameEncoder",new LengthFieldPrepender(2)) .addLast(new MsgpackEncoder()) .addLast(new ServerHandler()); } }); ChannelFuture future = serverBootstrap.bind(9999).sync(); future.channel().closeFuture().sync(); }finally{ bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
public class ServerHandler extends SimpleChannelInboundHandler { @Override public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { try { //直接输出msg System.out.println(msg.toString()); User user = new User(); user.setName("李四"); //回复has receive 给客户端 ctx.write(user); } catch (Exception e) { e.printStackTrace(); }finally { } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf>{ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception { final int length = buf.readableBytes(); final byte[] array = new byte[length]; buf.getBytes(buf.readerIndex(), array,0,length); MessagePack messagePack = new MessagePack(); out.add(messagePack.read(array)); } }
public class MsgpackEncoder extends MessageToByteEncoder<Object>{ @Override protected void encode(ChannelHandlerContext ctx, Object obj, ByteBuf out) throws Exception { MessagePack messagePack = new MessagePack(); byte[] info = messagePack.write(obj); out.writeBytes(info); } }
@Message public class User { private String name; private int age; private String address; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } @Override public String toString() { return "User [name=" + name + ", age=" + age + ", address=" + address + "]"; } }
四、使用Marshalling来对对象进行序列化,这种序列化方式需要对想实现Serializable接口
需要引入jar包 jboss-marshalling-2.0.6.Final.jar和jboss-marshalling-serial-2.0.6.Final.jar
public class SubReqClient { public static void main(String[] args) { start(); } public static void start(){ NioEventLoopGroup group = new NioEventLoopGroup(); try{ Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()) .addLast(MarshallingCodeCFactory.buildMarshallingEncoder()) .addLast(new SimpleChannelInboundHandler<Object>() { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(msg); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { User user = new User(); user.setName("runtu"); ctx.writeAndFlush(user); } }); } }); ChannelFuture future = bootstrap.connect("localhost", 8888).sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally{ group.shutdownGracefully(); } } }
public class SubReqServer { public static void main(String[] args) { start(); } public static void start(){ NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()) .addLast(MarshallingCodeCFactory.buildMarshallingEncoder()) .addLast(new SimpleChannelInboundHandler<Object>() { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { User user1 = (User)msg; System.out.println(user1.getName()); User user = new User(); user.setName("AQ"); ctx.writeAndFlush(user); } }); } }); ChannelFuture future = serverBootstrap.bind(8888).sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally{ bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
public class MarshallingCodeCFactory { public static MarshallingDecoder buildMarshallingDecoder(){ final MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); DefaultUnmarshallerProvider provider = new DefaultUnmarshallerProvider(factory, configuration); MarshallingDecoder decoder = new MarshallingDecoder(provider,1024); return decoder; } public static MarshallingEncoder buildMarshallingEncoder(){ final MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(factory, configuration); MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
public class User implements Serializable{ private String name; private int age; private String address; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } @Override public String toString() { return "User [name=" + name + ", age=" + age + ", address=" + address + "]"; } }
五、简单的http文件服务器
public class HttpServer { public static void main(String[] args) { start(); } public static void start(){ NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() .addLast(new HttpRequestDecoder()) .addLast(new HttpObjectAggregator(65535)) .addLast(new HttpResponseEncoder()) .addLast(new ChunkedWriteHandler()) .addLast(new FileServerHandler1()); } }); ChannelFuture future = serverBootstrap.bind(8888).sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally{ bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
public class FileServerHandler1 extends SimpleChannelInboundHandler<FullHttpRequest>{ @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { if(request.uri().equals("/favicon.ico")){ return; } if(request.method() == HttpMethod.GET){ String uri = request.uri(); if(uri.startsWith("/file")){ File file = new File("D:\\12306Bypass\\使用须知.txt"); RandomAccessFile randomAccessFile = null; randomAccessFile = new RandomAccessFile(file, "r"); long fileLength = randomAccessFile.length(); HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE) .set("Content-Length", fileLength) .set("Content-Type","text/html") .set("Content-Disposition", "attachment;fileName="+"a.txt"); ctx.write(response); ctx.write(new ChunkedFile(randomAccessFile, 0,fileLength,8192),ctx.newProgressivePromise()); ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); }else{ FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); response.headers() .set("Content-Type", "text/plain;charset=utf-8") .set("Content_Length", response.content().readableBytes()) .set("aaa", "cccc"); response.content().writeBytes(Unpooled.copiedBuffer("服务器返回信息",CharsetUtil.UTF_8)); ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } } } private static void setContentTypeHeader(HttpResponse response, File file){ MimetypesFileTypeMap mimetypesFileTypeMap = new MimetypesFileTypeMap(); response.headers().set(HttpHeaderNames.CONTENT_TYPE, mimetypesFileTypeMap.getContentType(file.getPath())); } }
六、netty编写websocket服务器
public class WebSocketServer { public static void main(String[] args) throws InterruptedException { NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() .addLast(new HttpServerCodec()) .addLast(new HttpObjectAggregator(65536)) .addLast(new ChunkedWriteHandler()) .addLast(new WebSocketServerHandler()); } }); ChannelFuture future = serverBootstrap.bind(8080).sync(); future.channel().closeFuture().sync(); }finally{ bossGroup.shutdownGracefully().sync(); workerGroup.shutdownGracefully().sync(); } } }
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>{ private WebSocketServerHandshaker handshaker; @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof FullHttpRequest){ HandleHttpRequest(ctx,(FullHttpRequest)msg); } if(msg instanceof WebSocketFrame){ handleWebSocket(ctx,(WebSocketFrame)msg); } } private void HandleHttpRequest(ChannelHandlerContext ctx,FullHttpRequest request){ WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory( "ws://localhost:8888/websocket",null,false); handshaker = factory.newHandshaker(request); if(handshaker == null){ WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); }else{ handshaker.handshake(ctx.channel(),request); } } private void handleWebSocket(ChannelHandlerContext ctx,WebSocketFrame frame){ if(frame instanceof CloseWebSocketFrame){ handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain()); } if(frame instanceof PingWebSocketFrame){ ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } if(!(frame instanceof TextWebSocketFrame)){ return; } String requestStr = ((TextWebSocketFrame) frame).text(); System.out.println(requestStr); ctx.channel().writeAndFlush(new TextWebSocketFrame("欢迎使用netty websocket: "+ LocalDateTime.now())); } }
<html> <head> <meta charset="UTF-8"> Netty WebSocket服务器 <head> <body> <script type="text/javascript"> var socket; if(!window.WebSocket){ window.WebSocket = window.MozWebSocket; } if(window.WebSocket){ socket = new WebSocket("ws://localhost:8080/websocket"); socket.onmessage = function(event){ var ta = document.getElementById(\'responseText\'); ta.value = \'\'; ta.value = event.data; } socket.onopen = function(event){ var ta = document.getElementById(\'responseText\'); ta.value = "打开WebSocket服务器正常"; } socket.onclose = function(event){ var ta = document.getElementById(\'responseText\'); ta.value = "webSocket 关闭"; } } function send(){ if(socket.readyState == WebSocket.OPEN){ var message = document.getElementById(\'message\').value; socket.send(message); }else{ alert("WebSocket 链接未建立成功!!"); } } </script> <form onsubmit="return false;"> <input id="message" type="text" name = "message" value="Netty最佳实践"/> <br/><br/> <input type="button" value="发送" onclick="send()"/> <hr color="blue"/> <h3>服务器应答消息</h3> <textarea id="responseText" style="width:500px;height:300px;"></textarea> </form> </body> </html>