ActiveMQ使用分为两大块:生产者和消费者
一、准备
项目导入jar包:activemq-all-5.15.3.jar
并buildpath 
 
二、生产者
  1. 创建连接工厂
ActiveMQConnectionFactory mqf = new ActiveMQConnectionFactory(userName, password, brokerURL);
注:
userName是ActiveMQ的用户名,默认可以通过:ActiveMQConnection.DEFAULT_USER
password是ActiveMQ的密码,默认可以通过: ActiveMQConnection.DEFAULT_PASSWORD
brokerURL是ActiveMQ的连接,指定格式为:tcp://主机名:61616
 
  1. 获取连接
connection = mqf.createConnection();
 
  1. 生成会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
  1. 生成对应的topic
Destination destination = session.createTopic("mytopic");
 
  1. 创建生产者
MessageProducer producer = session.createProducer(destination);
 
  1. 设置发送消息使用的模式
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
默认是:DeliveryMode.PERSISTENT
 
  1. 生成消息
TextMessage msg = session.createTextMessage(“message");
 
  1. 启动连接
connection.start();
 
  1. 发送消息
producer.send(msg);
 
  1. 关闭生产者
producer.close();
 
  1. 关闭会话
session.close();
 
  1. 关闭连接
connection.close();
 
三、消费者
  1. 继承接口
MessageListener
ExceptionListener
并实现onException(JMSException exception)和onMessage(Message message)方法
 
  1. 创建连接工厂
ActiveMQConnectionFactory mqf = new ActiveMQConnectionFactory(userName, password, brokerURL);
具体参数同上
 
  1. 获取连接
Connection connection = mqf.createConnection();
 
  1. 生成会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
  1. 生成对应的topic
Destination destination = session.createTopic("mytopic”);
 
  1. 创建消费者
MessageConsumer consumer = session.createConsumer(destination);
 
  1. 启动连接
connection.start();
 
  1. 设置消息监听
consumer.setMessageListener(this);
 
  1. 设置异常监听
connection.setExceptionListener(this);
 
  1. 实现onMessage方法
改方法有一个参数Message message,这个参数是从ActiveMQ上拿到的消息,可以通过如下方法解析出来:
TextMessage tm = (TextMessage)message;
String result = tm.getText();
 
  1. 关闭消费者
consumer.close();
 
  1. 关闭会话
session.close();
 
  1. 关闭连接
connection.close();
 
四、例程
  1. 生产者实现程序
 1 package activemq_test;
 2  
 3 import javax.jms.Connection;
 4 import javax.jms.DeliveryMode;
 5 import javax.jms.Destination;
 6 import javax.jms.JMSException;
 7 import javax.jms.MessageProducer;
 8 import javax.jms.Session;
 9 import javax.jms.TextMessage;
10  
11 import org.apache.activemq.ActiveMQConnection;
12 import org.apache.activemq.ActiveMQConnectionFactory;
13  
14 public class Producer_tool {
15  
16     private final static String userName = ActiveMQConnection.DEFAULT_USER;
17     private final static String password = ActiveMQConnection.DEFAULT_PASSWORD;
18     private final static String brokerURL = "tcp://192.168.0.5:61616";
19     private MessageProducer producer = null;
20     private Connection connection = null;
21     private Session session = null;
22     
23     public void initialize() throws JMSException {
24         ActiveMQConnectionFactory mqf = new ActiveMQConnectionFactory(userName, password, brokerURL);
25         connection = mqf.createConnection();
26         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
27         Destination destination = session.createTopic("mytopic");
28         producer = session.createProducer(destination);
29         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
30     }
31  
32     public void send(String message) throws JMSException {
33         initialize();
34         TextMessage msg = session.createTextMessage(message);
35         System.out.println("sending message: " + message);
36         connection.start();
37         producer.send(msg);
38     }
39     
40     public void close() throws JMSException {
41         if(producer != null) {
42             producer.close();
43         }
44         if(session != null) {
45             session.close();
46         }
47         if(connection != null) {
48             connection.close();
49         }
50         System.out.println("closed");
51     }
52     
53 }

 

  1. 生产者主程序
 1 package activemq_test;
 2 import javax.jms.JMSException;
 3 public class Producer_test {
 4     public static void main(String[] args) throws JMSException {
 5         Producer_tool producer = null;
 6         for(int i = 0; i < 10; i++) {
 7             producer = new Producer_tool();
 8             producer.send("message" + i);
 9             producer.close();
10         }
11     }
12 }
 
  1. 消费者实现程序
 1 package activemq_test;
 2  
 3 import javax.jms.Connection;
 4 import javax.jms.Destination;
 5 import javax.jms.ExceptionListener;
 6 import javax.jms.JMSException;
 7 import javax.jms.Message;
 8 import javax.jms.MessageConsumer;
 9 import javax.jms.MessageListener;
10 import javax.jms.Session;
11 import javax.jms.TextMessage;
12  
13 import org.apache.activemq.ActiveMQConnection;
14 import org.apache.activemq.ActiveMQConnectionFactory;
15  
16 public class Consumer_tool implements MessageListener,ExceptionListener{
17     
18     private final static String userName = ActiveMQConnection.DEFAULT_USER;
19     private final static String password = ActiveMQConnection.DEFAULT_PASSWORD;
20     private final static String brokerURL = "tcp://192.168.0.5:61616";
21     private Connection connection = null;
22     private Session session = null;
23     private MessageConsumer consumer = null;
24     static boolean isConnection = false;
25     
26     public void initialize() throws JMSException {
27         ActiveMQConnectionFactory mqf = new ActiveMQConnectionFactory(userName, password, brokerURL);
28         connection = mqf.createConnection();
29         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
30         Destination destination = session.createTopic("mytopic");
31         consumer = session.createConsumer(destination);
32     }
33     
34     public void consumeMessage() throws JMSException {
35         initialize();
36         connection.start();
37         consumer.setMessageListener(this);
38         connection.setExceptionListener(this);
39         isConnection = true;
40         System.out.println("consumer is listening");
41         
42     }
43  
44     @Override
45     public void onException(JMSException exception) {
46         isConnection = false;
47     }
48  
49     @Override
50     public void onMessage(Message message) {
51         if(message instanceof TextMessage) {
52             TextMessage tm = (TextMessage)message;
53             try {
54                 System.out.println("consumer received " + tm.getText());
55             } catch (JMSException e) {
56                 e.printStackTrace();
57             }
58         }
59         else {
60             System.out.println(message);
61         }
62     }
63     
64     public void close() throws JMSException {
65         if(consumer != null) {
66             consumer.close();
67         }
68         if(session != null) {
69             session.close();
70         }
71         if(connection != null) {
72             connection.close();
73         }
74         System.out.println("consumer has closed");
75     }
76 }
 
  1. 消费者主程序
 1 package activemq_test;
 2 import javax.jms.JMSException;
 3 public class Consumer_test {
 4     public static void main(String[] args) throws JMSException {
 5         Consumer_tool consumer = new Consumer_tool();
 6         consumer.consumeMessage();
 7         while(Consumer_tool.isConnection) {
 8  
 9         }
10         consumer.close();
11     }
12 }

版权声明:本文为xiatianyu原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/xiatianyu/p/9055647.html