spring 整合 actionMQ(一)
前段时间配置了rabbitMQ 主要是为了公司发送大批量邮件而服务,结果锦鲤说要用actionMQ,还能说什么,肯定是actionMq的走起了。
鉴于第一次配置、使用ActionMQ,所以进行详细的配置,记录。如果有最新的用法,或者跟详细、更深入的使用,我会不定期的更新,
期待和小伙伴们共同进步。
添加MAVEN依赖
<!--添加activemq的依赖-->
<!--ActiveMQ-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<version>5.14.5</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.14.5</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-spring</artifactId>
<version>5.14.5</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${org.springframework.version}</version>
</dependency>
这里有的小伙伴可能依赖了
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${org.springframework.version}</version>
</dependency>
毕竟一个顶的上上面的一大堆了,可出问题的时候也是来的猝不及防的。slf4j 冲突。大部分的程序中都会有slf4j,如果你报错了,就乖乖用上面的一大堆把。或者可以考虑把冲突的包从Maven程序中排除。具体怎么排除,可以百度了。
配置文件 (对于我个人来说,第一步,就是搞定配置文件,类什么的在配置的过程中再去书写)
1 <?xml version="1.0" encoding="UTF-8" ?> 2 3 <beans xmlns="http://www.springframework.org/schema/beans" 4 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 5 xmlns:context="http://www.springframework.org/schema/context" 6 xsi:schemaLocation="http://www.springframework.org/schema/beans 7 http://www.springframework.org/schema/beans/spring-beans.xsd 8 http://www.springframework.org/schema/context 9 http://www.springframework.org/schema/context/spring-context.xsd "> 10 11 <context:annotation-config /> 12 13 <!--Activemq的连接工厂--> 14 <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 15 <property name="brokerURL" value="tcp://127.0.0.1:61616" /> 16 <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" /> <!-- 引用重发机制 --> 17 </bean> 18 <!--spring jms为我们提供的连接池 获取一个连接工厂--> 19 <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> 20 <property name="targetConnectionFactory" ref="targetConnectionFactory" /> 21 </bean> 22 23 <!-- 消息目的地 点对点的模式--> 24 <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> 25 <constructor-arg value="SpringActiveMQMsg"/> 26 </bean> 27 <!-- 消息目的地 点对点的模式--> 28 <bean id="queueDestinationTwo" class="org.apache.activemq.command.ActiveMQQueue"> 29 <constructor-arg value="EmailMQMsg"/> 30 </bean> 31 <!-- jms模板 用于进行消息发送--> 32 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 33 <property name="connectionFactory" ref="connectionFactory"/> 34 </bean> 35 <!--注入我们的生产者--> 36 <bean class="mq.ProduceServiceImpl"/> 37 38 39 <!-- 配置消息监听器--> 40 <bean id="SimpleMsgListener" class="mq.SimpleMsgListener"/> 41 <!--配置消息容器--> 42 <bean id ="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 43 <!--配置连接工厂--> 44 <property name="connectionFactory" ref="connectionFactory"/> 45 <!--配置监听的队列--> 46 <property name="destination" ref="queueDestination"/> 47 <!--配置消息监听器--> 48 <property name="messageListener" ref="SimpleMsgListener"/> 49 <!--应答模式--> 50 <property name="sessionAcknowledgeMode" value="4"></property> 51 </bean> 52 53 54 <!-- 定义ReDelivery(重发机制)机制 ,重发时间间隔是100毫秒,最大重发次数是3次 http://www.kuqin.com/shuoit/20140419/339344.html --> 55 <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> 56 <!--是否在每次尝试重新发送失败后,增长这个等待时间 --> 57 <property name="useExponentialBackOff" value="true"></property> 58 <!--重发次数,默认为6次 这里设置为1次 --> 59 <property name="maximumRedeliveries" value="2"></property> 60 <!--重发时间间隔,默认为1秒 --> 61 <property name="initialRedeliveryDelay" value="1000"></property> 62 <!--第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value --> 63 <property name="backOffMultiplier" value="2"></property> 64 <!--最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第 65 二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。 --> 66 <property name="maximumRedeliveryDelay" value="1000"></property> 67 </bean> 68 </beans>
配置的第一步肯定是链接到MQ服务了。(ActionMQ 链接工厂可以添加的属性还有很多,例如 username、password …)
第二步:配置spring jms为我们提供的连接池
第三步:配置队列(我的是队列模式,也就是 1对1消费模式(一条消息消费一次),还有一个发布订阅的模式(一条消息可以被订阅的多个客户端消费))
第四步:配置模板
第五步:声明注入消息发布者(这个没必要,正常可用的类就可以,在其中引入 @Autowired private JmsTemplate jmsTemplate; 模板类,就可以发送消息)
发送消息的类
1 package mq; 2 import org.springframework.beans.factory.annotation.Autowired; 3 import org.springframework.jms.core.JmsTemplate; 4 import org.springframework.jms.core.MessageCreator; 5 import javax.annotation.Resource; 6 import javax.jms.*; 7 8 /** 9 * @ Author :tian 10 * @ Date :Created in 下午 2:21 2019/6/13 0015 11 * @ Description:生产者的实现类 12 */ 13 public class ProduceServiceImpl{ 14 @Autowired 15 private JmsTemplate jmsTemplate; 16 @Resource(name = "queueDestination") 17 private Destination destination; 18 19 /** 20 * 发送消息 21 * @param msg 22 */ 23 public void sendMessage(final String msg) { 24 jmsTemplate.convertAndSend("SpringActiveMQMsg",msg); 25 // jmsTemplate.send(destination , new MessageCreator() { 26 // @Override 27 // public Message createMessage(Session session) throws JMSException { 28 // TextMessage textMessage = session.createTextMessage(msg); 29 // return textMessage; 30 // } 31 // }); 32 System.out.println("现在发送的消息为: " + msg); 33 } 34 35 /** 36 * 接收消息 37 */ 38 public TextMessage receive() { 39 TextMessage tm = (TextMessage) jmsTemplate.receive("SpringActiveMQMsg"); 40 try { 41 System.out.println("从队列SpringActiveMQMsg收到了消息:\t" 42 + tm.getText()); 43 } catch (JMSException e) { 44 e.printStackTrace(); 45 } 46 47 return tm; 48 49 } 50 }
发送消息可以用以上两种方式(不只这两种,但这两个最简单。)
方式1:
jmsTemplate.convertAndSend(“SpringActiveMQMsg”,msg) // 参数一:你声明的队列的名称 (队列可以不声明,直接到ActionMQ 的控制台创建也可以,简化配置,只不过要换服务器的时候还得创建一次。) 参数二:发送的消息
方式2:
上面程序中注释的部分
jmsTemplate.send(destination , new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage(msg); return textMessage; } });
它是可以正确发送消息的 这里可以看到 发送方法也有两个参数 (参数一:声明队列的beanID 参数二:发送的消息创建者)
在这里可以看到有一个 TextMessage 类, 这个是消息的一种类型,actionMQ中消息有多种类型 JMS规范中的消息类型包括TextMessage、MapMessage、ObjectMessage、BytesMessage、和StreamMessage等五种。ActiveMQ也有对应的实现
详细的情况参见博文:https://www.cnblogs.com/dennisit/p/4551795.html
顺便介绍一下 消息的手动接收 TextMessage tm = (TextMessage) jmsTemplate.receive(“SpringActiveMQMsg”); 使用jmsTemplate 模板 的receive(“队列名称”) ,可以一次消费一条消息。(肯定不止这一个方法了,具体的小伙伴们可以自行摸索)
消息的消费类型对应发送类型。如果想要消息自动接收消费,那就得配置消息监听器了。下面贴上消费者的代码
package mq; import javax.jms.*; /** * Created by Martin Huang on 2018/4/20. */ //bean id public class SimpleMsgListener implements MessageListener { //收到信息时的动作 @Override public void onMessage(Message message ) { TextMessage textMessage = (TextMessage) message; try { System.out.println("收到的信息:" + textMessage.getText()); textMessage.acknowledge(); } catch (JMSException e) { e.printStackTrace(); } } }
这个类一定要实现 MessageListener 的(当然你可以实现其他的,千万别被我误导了)重载 方法 onMessage(Message message ) 就是消费的方法,这里我进行了消息的手动确认(默认是自动确认的,那样消息不成功也会出队,)消息不成功就不会出队。
其实这样做也不好。赶时间做ActionMQ,只能先把问题留下,以后再解决。如果有解决确认问题的好方法,欢迎留言评论,感激不尽…