第一种:activemq:

    1、从官网下载apache-activemq-5.15.3-bin.zip并解压;

    2、启动activemq, CMD–/bin/activemq start ,访问127.0.0.1:8161/ 用户名密码都默认为admin;

    3、新建java工程,引入jar包;可以在解压的文件夹中获取如下jar包:

4、开始写代码测试;

1、生产者消费者模式(p2p模式):

  生产者代码:

package com.acmq.test.p2p;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

    private static final int SEND_NUMBER = 5;

    static DateFormat df = new SimpleDateFormat("HH:mm:ss:SSS");
    
    static ConnectionFactory connectionFactory;
    static Connection connection = null;
    static Session session;
    static Destination destination;
    static MessageProducer producer;
    
    public static void main(String[] args) {
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, 
                "tcp://localhost:61616");
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("duilie");
            producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            sendMessage(session, producer);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }

    public static void sendMessage(Session session, MessageProducer producer) throws Exception {
        for (int i = 1; i <= SEND_NUMBER; i++) {
            TextMessage message = session.createTextMessage(" 发送的消息" + i);
            System.out.println(df.format(new Date())+"发送消息:" + "ActiveMq 发送的消息" + i);
            Thread.sleep(3000);
            producer.send(message);
        }
    }

}

Sender.class

       消费者代码:

package com.acmq.test.p2p;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Reciver {

    static DateFormat df = new SimpleDateFormat("HH:mm:ss:SSS");
    
    static ConnectionFactory connectionFactory;
    static Connection connection = null;
    static Session session;
    static Destination destination;
    static MessageConsumer consumer;
    
    public static void main(String[] args) {
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, 
                "tcp://localhost:61616");
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("duilie");
            consumer = session.createConsumer(destination);
            while (true) {
                //监听和receive只能使用一个
                //consumer.setMessageListener(new AcListener());
                TextMessage message = (TextMessage) consumer.receive(100000);
                if (null != message) {
                    System.out.println(df.format(new Date())+"收到消息" + message.getText());
                } else {
                    break;
                }
                Thread.sleep(3000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }

}

Reciver.class

消息监听机制和receive方式接收消失只能使用一个;消息监听代码如下:

package com.acmq.test;

import java.text.DateFormat;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class AcListener implements MessageListener{
    static DateFormat dfm = new SimpleDateFormat("HH:mm:ss:SSS");
    
    @Override
    public void onMessage(Message message) {
        try {
            if (message instanceof TextMessage) {
                TextMessage msg = (TextMessage)message;
                System.out.println(dfm.format(new Date())+"收到消息" + msg.getText());
            }
            if (message instanceof MapMessage){
                MapMessage map = (MapMessage)message;  
                String stock = map.getString("stock");  
                double price = map.getDouble("price");  
                double offer = map.getDouble("offer");  
                boolean up = map.getBoolean("up");  
                DecimalFormat df = new DecimalFormat( "#,###,###,##0.00" );  
                System.out.println(dfm.format(new Date())+"收到消息"+stock + "\t" + df.format(price) + "\t" + df.format(offer) + "\t" + (up?"up":"down"));  
            }
        } catch (Exception ee) { }  
    }


}

2、发布者订阅者模式:publisher-Subscriber

package com.acmq.test.pubsub;

import java.text.DateFormat;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.acmq.test.AcListener;


public class Subscriber {

    static DateFormat dfm = new SimpleDateFormat("HH:mm:ss:SSS");
    static ConnectionFactory factory;
    static Connection connection = null;
    static Session session;
    static MessageConsumer messageConsumer;

    public static void main(String[] args) throws Exception {

        factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        connection = factory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        for (int i = 0; i < 5; i++) {
            Destination destination = session.createTopic("STOCKS." + i);
            messageConsumer = session.createConsumer(destination);
            messageConsumer.setMessageListener(new AcListener());
            //new Thread(new SubThread(i, session)).start();;
        }
        

    }

}

class SubThread implements Runnable{

    DateFormat dfm = new SimpleDateFormat("HH:mm:ss:SSS");
    
    public int num;
    
    public Session session;
    
    public SubThread(int num,Session session){
        this.num = num;
        this.session = session;
    }
    
    @Override
    public void run() {
        while (true) {
            try {
                Destination destination = session.createTopic("STOCKS." + num);
                MessageConsumer messageConsumer = session.createConsumer(destination);
                MapMessage map = (MapMessage) messageConsumer.receive(100000);
                if (null != map) {
                    String stock = map.getString("stock");
                    double price = map.getDouble("price");
                    double offer = map.getDouble("offer");
                    boolean up = map.getBoolean("up");
                    DecimalFormat df = new DecimalFormat("#,###,###,##0.00");
                    System.out.println(dfm.format(new Date())+ "收到消息" + stock + "\t" + df.format(price) + "\t"
                            + df.format(offer) + "\t" + (up ? "up" : "down"));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
    }
    }
    
}

Subscriber

package com.acmq.test.pubsub;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMapMessage;

public class Publisher {

    public static final int SEND_NUMBER = 5;
    
    static DateFormat df = new SimpleDateFormat("HH:mm:ss:SSS");
    static ConnectionFactory factory;
    static Connection connection = null;
    static Session session;
    static Destination[] destinations;
    static MessageProducer producer;
    
    public static void main(String[] args) throws Exception{
        
        factory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
        connection = factory.createConnection();  
        try {  
        connection.start();  
        } catch (JMSException jmse) {  
            connection.close();  
            throw jmse;  
        }  
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
        producer = session.createProducer(null);  
        //设置topic
        destinations = new Destination[SEND_NUMBER];  
        for(int i = 0; i < SEND_NUMBER; i++) {  
            destinations[i] = session.createTopic("STOCKS." + i);  
        } 
        //发送消息
        sendMessage();
        //关闭连接
        if (connection != null) {  
            connection.close();  
         }  
    }
    
    static void sendMessage() throws JMSException {  
        for(int i = 0; i < SEND_NUMBER; i++) {  
            Message message = createStockMessage(i, session);  
            System.out.println(df.format(new Date())+ "Sending: " + ((ActiveMQMapMessage)message).getContentMap() + " on destination: " + destinations[i]);  
            producer.send(destinations[i], message);  
        }  
    }  
      
    static Message createStockMessage(int stock, Session session) throws JMSException {  
        MapMessage message = session.createMapMessage();  
        message.setString("stock", stock+"");  
        message.setDouble("price", 1.00);  
        message.setDouble("offer", 0.01);  
        message.setBoolean("up", true);  
        return message;  
    }  
    
}

Publisher

监听代码如上所示;

3、请求回复模式:request-response

package com.acmq.test.reqres;

import java.util.UUID;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class MqClient {

    public static void main(String[] args) {
        
         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
            Connection connection;  
            try {  
                connection = connectionFactory.createConnection();  
                connection.start();  
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
                Destination adminQueue = session.createQueue("client");
                MessageProducer producer = session.createProducer(adminQueue);  
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
                
                //临时队列,用来接收回复
                Destination tempDest = session.createTemporaryQueue();  
                MessageConsumer responseConsumer = session.createConsumer(tempDest);  
                responseConsumer.setMessageListener(new ClientListener());  
      
                TextMessage txtMessage = session.createTextMessage();  
                txtMessage.setText("ClientMessage");  
                txtMessage.setJMSReplyTo(tempDest);  
                String correlationId = UUID.randomUUID().toString();  
                txtMessage.setJMSCorrelationID(correlationId);  
                
                producer.send(txtMessage);  
            } catch (JMSException e) {  
                e.printStackTrace();
            }  
    }
    
}

View Code

package com.acmq.test.reqres;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class MqServer {

    public static void main(String[] args) {
        setupMessageQueueConsumer();
    }

    private static void setupMessageQueueConsumer() {

        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection;
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination adminQueue = session.createQueue("client");
            MessageConsumer consumer = session.createConsumer(adminQueue);
            consumer.setMessageListener(new ServerListener(session));
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

Server

package com.acmq.test.reqres;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class ClientListener implements MessageListener{

    @Override
    public void onMessage(Message message) {
         String messageText = null;  
            try {  
                if (message instanceof TextMessage) {  
                    TextMessage textMessage = (TextMessage) message;  
                    messageText = textMessage.getText();  
                    System.out.println("收到回复: " + messageText);  
                }  
            } catch (JMSException e) {  
                //Handle the exception appropriately  
                e.printStackTrace();
            }  
        
    }
}

ClientListener.class

package com.acmq.test.reqres;

import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

public class ServerListener implements MessageListener {

    Session session;

    public ServerListener(Session session) {
        this.session = session;
    }

    @Override
    public void onMessage(Message message) {
        try {
            MessageProducer replyProducer = session.createProducer(null);
            replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            TextMessage response = session.createTextMessage();
            if (message instanceof TextMessage) {
                TextMessage txtMsg = (TextMessage) message;
                String messageText = txtMsg.getText();
                System.out.println("收到消息:" + messageText);
                if("ClientMessage".equals(messageText)){
                    response.setText("ServerReply");
                    response.setJMSCorrelationID(message.getJMSCorrelationID());
                    replyProducer.send(message.getJMSReplyTo(), response);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

ServerListener

 4、测试代码;

 

版权声明:本文为liangblog原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/liangblog/p/8484666.html