前言:

两年前做过spring+activemq+stomp的ws推送,那个做起来很简单,但现在公司用的mq中间件是rabbitmq,因此需要通过rabbitmq去做ws通信。仔细搜了搜百度/谷歌,网上通过spring boot+rabbitmq+stomp的教程文章倒是一搜一大把,可惜目前的项目是非spring boot的,没法套用。只好自己去捣鼓。搞了几个小时,终于弄出来了,特此与大家分享下。

RabbitMQ:

 怎么安装就不是本篇讨论的话题了,自己百度/谷歌之。rabbitmq默认自带了stomp插件,但是需要自己启用。命令为:

  1. rabbitmq-plugins enable rabbitmq_stomp

来来来,给个文档地址参考参考,http://www.rabbitmq.com/stomp.html。默认用guest用户去连接,密码也是guest。

这里有个问题,看rabbitmq配置文件,stomp协议端口默认是61613,但是用ws协议连接却始终连接不上,所以只能用web stomp端口,端口号是15674,这个跟activemq有所区别。(P.S. 此处最好有大神来解惑,或者告知如何用61613来连

Javascript:

前端代码撸起来最方便,关键是调试也容易,因此先来。

  1. var stompClient = null;
  2.  
  3. var headers = {
  4. login: 'guest',
  5. passcode: 'guest'
  6. };
  7.  
  8. function wsConnect(url) {
  9. var ws = new SockJS(url);
  10. stompClient = Stomp.over(ws);
  11.  
  12. //var ws = new WebSocket(url);
  13. //stompClient = Stomp.over(ws);
  14.  
  15. // SockJS does not support heart-beat: disable heart-beats
  16. stompClient.heartbeat.outgoing = 0;
  17. stompClient.heartbeat.incoming = 0;
  18.  
  19. stompClient.connect(headers, function (frame) {
  20. console.log('Connected: ' + frame);
  21.  
  22. stompClient.subscribe('/topic/test', function (sms) {
  23. var obj = JSON.parse(sms.body)
  24. var count = obj.totalCount;
  25.  
  26. console.log("count: " + count);
  27. });
  28.  
  29. });
  30. }

然后就连接呗。

  1. $(function(){
  2. var url = "http://host:15674/stomp";
  3. wsConnect(url);
  4. });

 撸完准备测试,当然是选择chrome喽,页面加载后,打开console控制台,可以看到web socket连上了,前端大功告成。

  

Java:

定义一个StompService类专门用来发送stomp消息。

  1. import org.springframework.beans.factory.annotation.Value;
  2. import org.springframework.messaging.converter.StringMessageConverter;
  3. import org.springframework.messaging.simp.stomp.StompHeaders;
  4. import org.springframework.stereotype.Component;
  5. import org.springframework.web.socket.WebSocketHttpHeaders;
  6. import org.springframework.web.socket.client.WebSocketClient;
  7. import org.springframework.web.socket.client.standard.StandardWebSocketClient;
  8. import org.springframework.web.socket.messaging.WebSocketStompClient;
  9. import org.springframework.web.socket.sockjs.client.SockJsClient;
  10. import org.springframework.web.socket.sockjs.client.Transport;
  11. import org.springframework.web.socket.sockjs.client.WebSocketTransport;
  12.  
  13. import javax.annotation.PostConstruct;
  14. import java.util.ArrayList;
  15. import java.util.List;
  16.  
  17. /**
  18. * stomp服务 rabbitmq做中间件
  19. * @author Selwyn
  20. * @version $Id: WebSocketConfig.java, v 0.1 9/7/2018 9:59 AM Selwyn Exp $
  21. */
  22. @Component
  23. public class StompService {
  24.  
  25. private static final String URL_TEMPLATE = "http://%s:%s/stomp";
  26.  
  27. @Value("${rabbit.host}")
  28. private String host;
  29.  
  30. //@Value("${rabbit.stomp.port}")
  31. private Integer port = 15674;
  32.  
  33. /**
  34. * 连接用户名
  35. */
  36. //@Value("${rabbit.stomp.login}")
  37. private String login = "guest";
  38. /**
  39. * 连接密码
  40. */
  41. //@Value("${rabbit.stomp.passCode}")
  42. private String passCode = "guest";
  43.  
  44. private String url;
  45.  
  46. @PostConstruct
  47. public void init()
  48. {
  49. url = String.format(URL_TEMPLATE, host, port);
  50. }
  51.  
  52. /**
  53. * 发送stomp消息
  54. * @param dest 目的地 比如/topic/test
  55. * @param toSend 要发送的信息
  56. * @param <T>
  57. */
  58. public <T> void connectAndSend(String dest, T toSend)
  59. {
  60. WebSocketClient client = new StandardWebSocketClient();
  61.  
  62. List<Transport> transports = new ArrayList<>(1);
  63. transports.add(new WebSocketTransport( client) );
  64. //走web stomp的路,必须用socketjs的方式连接,别问我为什么,我也不清楚
  65. WebSocketClient transport = new SockJsClient(transports);
  66. WebSocketStompClient stompClient = new WebSocketStompClient(transport);
  67. //StompSessionHandlerAdapter默认的payload类型是String, 因此MessageConverter必须是StringMessageConverter
  68. stompClient.setMessageConverter(new StringMessageConverter());
  69.  
  70. final CustomStompSessionHandler sessionHandler =
  71. new CustomStompSessionHandler(dest, toSend);
  72.  
  73. WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
  74. headers.setSecWebSocketProtocol("13");
  75.  
  76. //连接用户名/密码也是必须的,否则连不上
  77. StompHeaders sHeaders = new StompHeaders();
  78. sHeaders.add("login", this.login);
  79. sHeaders.add("passcode", this.passCode);
  80.  
  81. //开始连接,回调连接上后发送stomp消息
  82. stompClient.connect(url, headers, sHeaders, sessionHandler);
  83.  
  84. //要同步得到发送结果的话,用CountDownLatch来做或者connect结果的future对象做get
  85. }
  86.  
  87. }

然后编写回调handler类。

  1. import com.alibaba.fastjson.JSON;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.messaging.simp.stomp.StompCommand;
  4. import org.springframework.messaging.simp.stomp.StompHeaders;
  5. import org.springframework.messaging.simp.stomp.StompSession;
  6. import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter;
  7.  
  8.  
  9. /**
  10. * 自定义stomp session 回调handler
  11. * @author Selwyn
  12. * @version $Id: CustomStompSessionHandler.java, v 0.1 9/7/2018 3:43 PM Selwyn Exp $
  13. */
  14. @Slf4j
  15. public class CustomStompSessionHandler extends StompSessionHandlerAdapter {
  16.  
  17. /**
  18. * 要发送的对象,将会json化传输出去
  19. */
  20. private Object toSend;
  21.  
  22. /**
  23. * 目的地,一般是topic地址
  24. */
  25. private String dest;
  26.  
  27. public CustomStompSessionHandler(String dest, Object toSend) {
  28. this.toSend = toSend;
  29. this.dest = dest;
  30. }
  31.  
  32. @Override
  33. public void handleFrame(StompHeaders headers, Object payload) {
  34. super.handleFrame(headers, payload);
  35. }
  36.  
  37. @Override
  38. public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
  39. super.afterConnected(session, connectedHeaders);
  40. String msg = JSON.toJSONString(toSend);
  41. try{
  42. session.send(dest, msg);
  43. }catch(Exception e)
  44. {
  45. log.error("failed to send stomp msg({}) to destination {}", msg, dest);
  46. }finally {
  47. //做完了关闭呗
  48. session.disconnect();
  49. }
  50. }
  51.  
  52. @Override
  53. public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
  54. super.handleException(session, command, headers, payload, exception);
  55. log.error("stomp error: {}", exception);
  56. }
  57.  
  58. @Override
  59. public void handleTransportError(StompSession session, Throwable exception) {
  60. super.handleTransportError(session, exception);
  61. log.error("stomp transport error: {}", exception);
  62. }
  63.  
  64. public void setToSend(Object toSend) {
  65. this.toSend = toSend;
  66. }
  67.  
  68. public void setDest(String dest) {
  69. this.dest = dest;
  70. }
  71. }

再自己写个controller或者写个单元测试方法,这里就不给出代码了,撸完后启动服务,就可以测试消息推送了,实践证明,真香!

结尾:

其实整个过程还没完,需要考虑到连接中断等情况,客户端和服务后台都需要做好重连机制。通过sockjs这种方式连接是没有心跳机制的,这个比activemq带的stomp插件要low。个人建议,如果能用spring boot的话尽量用那种方式去实现stomp消息推送。

  

版权声明:本文为selwynHome原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/selwynHome/p/9609298.html