UDP 协议相较于 TCP 协议的特点:

1、无连接协议,没有持久化连接;
2、每个 UDP 数据报都是一个单独的传输单元;
3、一定的数据报丢失;
4、没有重传机制,也不管数据报是否可达;
5、速度比TCP快很多,可用来高效处理大量数据 —— 牺牲了握手以及消息管理机制。
6、常用于音频、视频场景,可以忍受一定的数据包丢失,追求速度上的提升。

    TCP 协议采用的是一种叫做单播的传输形式,UDP 协议提供了向多个接收者发送消息的额外传输形式(多播、广播):

单播(TCP 和 UDP):发送消息给一个由唯一的地址所标识的单一的网络目的地。
多播(UDP):传输给一个预定义的主机组。
广播(UDP):传输到网络(或者子网)上的所有主机。

    广播方:打开一个文件,通过 UDP 使用特殊的受限广播地址或者零网络地址 255.255.255.255,把每一行作为一个消息广播到一个指定的端口。

    接收方:通过 UDP 广播,只需简单地通过在指定的端口上启动一个监听程序,便可以创建一个事件监视器来接收日志消息。所有的在该 UDP 端口上监听的事件监听器都将会接收到广播信息。

    下图展示了怎么将我们的 文件数据 广播为 UDP消息:所有的将要被传输的数据都被封装在了 LogEvent 消息中。 LogEventBroadcaster 将把这些写入到 Channel 中,并通过 ChannelPipeline 发送它们,在那里它们将会被转换(编码)为 DatagramPacket 消息。最后,他们都将通过 UDP 被广播,并由远程节点(监视器)所捕获。

    Netty 中支持 UDP 协议主要通过以下相关类:

DatagramPacket:使用 ByteBuf 作为数据源,是 UDP 协议传输的消息容器。

DatagramChannel:扩展了 Netty 的 Channel 抽象以支持 UDP 的多播组管理,它的实现类 NioDatagramChannnel 用来和远程节点通信。

Bootstrap:UDP 协议的引导类,使用 bind() 方法绑定 Channel。    

  1. public class LogEvent {
  2. public static final byte SEPARATOR = \':\';
  3. /**
  4. * IP套接字地址(IP地址+端口号)
  5. */
  6. private final InetSocketAddress inetSocketAddress;
  7. /**
  8. * 文件名
  9. */
  10. private final String logfile;
  11. /**
  12. * 消息内容
  13. */
  14. private final String msg;
  15. private final long received;
  16. /**
  17. * 用于传入消息的构造函数
  18. *
  19. * @param inetSocketAddress
  20. * @param logfile
  21. * @param msg
  22. * @param received
  23. */
  24. public LogEvent(InetSocketAddress inetSocketAddress, String logfile, String msg, long received) {
  25. this.inetSocketAddress = inetSocketAddress;
  26. this.logfile = logfile;
  27. this.msg = msg;
  28. this.received = received;
  29. }
  30. /**
  31. * 用于传出消息的构造函数
  32. *
  33. * @param logfile
  34. * @param msg
  35. */
  36. public LogEvent(String logfile, String msg) {
  37. this(null, logfile, msg, -1);
  38. }
  39. public InetSocketAddress getInetSocketAddress() {
  40. return inetSocketAddress;
  41. }
  42. public String getLogfile() {
  43. return logfile;
  44. }
  45. public String getMsg() {
  46. return msg;
  47. }
  48. public long getReceived() {
  49. return received;
  50. }
  51. }

文件实体类 LogEvent.java

  1. public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
  2. private final InetSocketAddress remoteAddress;
  3. public LogEventEncoder(InetSocketAddress remoteAddress) {
  4. this.remoteAddress = remoteAddress;
  5. }
  6. @Override
  7. protected void encode(ChannelHandlerContext ctx, LogEvent msg, List<Object> out) throws Exception {
  8. byte[] file = msg.getLogfile().getBytes(CharsetUtil.UTF_8);
  9. byte[] content = msg.getMsg().getBytes(CharsetUtil.UTF_8);
  10. ByteBuf byteBuf = ctx.alloc().buffer(file.length + content.length + 1);
  11. byteBuf.writeBytes(file);
  12. byteBuf.writeByte(LogEvent.SEPARATOR);
  13. byteBuf.writeBytes(content);
  14. out.add(new DatagramPacket(byteBuf, remoteAddress));
  15. }
  16. }

编码器 LogEventEncoder.java

    该编码器实现了将 LogEvent 实体类内容转换为 DatagramPacket UDP数据报。

  1. public class LogEventBroadcaster {
  2. private final EventLoopGroup group;
  3. private final Bootstrap bootstrap;
  4. private final File file;
  5. public LogEventBroadcaster(InetSocketAddress address, File file) {
  6. group = new NioEventLoopGroup();
  7. bootstrap = new Bootstrap();
  8. bootstrap.group(group)
  9. //引导该 NioDatagramChannel(无连接的)
  10. .channel(NioDatagramChannel.class)
  11. // 设置 SO_BROADCAST 套接字选项
  12. .option(ChannelOption.SO_BROADCAST, true)
  13. .handler(new LogEventEncoder(address));
  14. this.file = file;
  15. }
  16. public void run() throws InterruptedException, IOException {
  17. //绑定 Channel,UDP 协议的连接用 bind() 方法
  18. Channel channel = bootstrap.bind(0).sync().channel();
  19. long pointer = 0;
  20. //长轮询 监听是否有新的日志文件生成
  21. while (true) {
  22. long length = file.length();
  23. if (length < pointer) {
  24. // 如果有必要,将文件指针设置到该文件的最后一个字节
  25. pointer = length;
  26. } else {
  27. RandomAccessFile raf = new RandomAccessFile(file, "r");
  28. // 确保当前的文件指针,以确保没有任何的旧数据被发送
  29. raf.seek(pointer);
  30. String line;
  31. while ((line = raf.readLine()) != null) {
  32. //对于每个日志条目,写入一个 LogEvent 到 Channel 中,最后加入一个换行符号
  33. channel.writeAndFlush(new LogEvent(file.getAbsolutePath(), line + System.getProperty("line.separator")));
  34. }
  35. pointer = raf.getFilePointer();
  36. raf.close();
  37. }
  38. try {
  39. // 休眠一秒,如果被中断,则退出循环,否则重新处理它
  40. Thread.sleep(1000);
  41. } catch (InterruptedException e) {
  42. while (!Thread.interrupted()) {
  43. break;
  44. }
  45. }
  46. }
  47. }
  48. public void stop() {
  49. group.shutdownGracefully();
  50. }
  51. public static void main(String[] args) throws IOException, InterruptedException {
  52. InetSocketAddress socketAddress = new InetSocketAddress("255.255.255.255", 8888);
  53. File file = new File("E:\\2018-09-12.log");
  54. LogEventBroadcaster logEventBroadcaster = new LogEventBroadcaster(socketAddress, file);
  55. try {
  56. logEventBroadcaster.run();
  57. } finally {
  58. logEventBroadcaster.stop();
  59. }
  60. }
  61. }

    现在,我们来测试一下这个 UDP 广播类,首先我们需要一个工具 nmap ,用它来监听 UDP 的 8888 端口,以接收我们广播的日志文件。下载地址: https://nmap.org/dist/nmap-7.70-win32.zip

    下载完成后,命令行进入安装目录,执行命令:ncat.exe -l -u -p 8888 ,监听 UDP 端口。

 

     当然,也可以自己写个测试类监听 UDP 端口,打印日志查看。这里我没有用 Netty 写监听类,直接用了 java 原生的 DatagramSocket 和 DatagramPacket 写的监听类,如下:

  1. public class UDPServer {
  2. public static void main(String[] args) {
  3. DatagramSocket server = null;
  4. try {
  5. server = new DatagramSocket(8888);
  6. byte[] datas = new byte[1024];
  7. //用一个字节数组接收UDP包,字节数组在传递给构造函数时是空的
  8. while (true) {
  9. DatagramPacket datagramPacket = new DatagramPacket(datas, datas.length);
  10. server.receive(datagramPacket);
  11. System.out.println(new String(datas));
  12. }
  13. } catch (SocketException e) {
  14. e.printStackTrace();
  15. } catch (IOException e) {
  16. e.printStackTrace();
  17. } finally {
  18. server.close();
  19. }
  20. }
  21. }

UDPServer.java

    基于 Netty 的监听类实现可以参考我上传 GitHub 上的源代码。

 

参考资料:《Netty IN ACTION》

演示源代码:https://github.com/JMCuixy/NettyDemo/tree/master/src/main/java/org/netty/demo/udp

版权声明:本文为jmcui原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/jmcui/p/9636505.html