本文介绍一个笔者在实际工作中的实施的基于ActiveMQ的一个高稳定,可扩展的异步消息系统。

ActiveMQ是一个成熟的基于Java语言的开源消息系统,在实际应用中被大量使用。ActiveMQ在系统稳定性,系统的容错和扩展等方面都有很多成熟的方案,也有很多开源的管理工具,是部署异步消息系统的一个很好的选择。

ActiveMQ工作机制

ActiveMQ有两种消息使用方式:

l  Queue模式:Producer发出到Queue里的消息,只能由一个Consumer来使用。

l  Topic模式:Producer发送到Topic里的消息,会传送到Subscribe这个Topic的每一个Consumer。

 

Producer发出的消息有两种Delivery模式。

l  Persistent:Broker需要保存消息,然后把消息发送到Consumer。如果Broker崩溃后,重新启动后保存的消息可以重新发送给Consumer。

l  NonPersistent:Broker不需要保存消息,直接把消息发送到Consumer。

 

ActiveMQ可以通过Networks of Brokers方式将多个Broker组成一个Cluster。Producer和Consumer可以任意的连接到该Cluster中的任意一个Broker。Producer发送的消息可以通过Cluser传送到需要的Consumer。

 

ActiveMQ提供了Master Slave机制实现Broker的HA,有以下几种方式:

l  JDBC Master Slave

l  Shared File System Master Slave

l  KahaDB Replication(ZooKeeper experimental)

同一个Broker,只能有一个Master来传送消息。当Master崩溃后,其他的一个Slave可以作为Master。采用HA的模式,会增加系统的复杂性,也会影响系统的性能。

方案

实际部署中,ActiveMQ采用Queue的消息使用模式。Producer发送的消息使用Persistent的Delivery模式。

 

在两个node上部署ActiveMQ的Broker,通过ActiveMQ的Networks of Brokers方式来组成Cluster。

 

系统里的消息应用Instance通过ActiveMQ提供的client类库采用failover TCP的方式随机的接入到ActiveMQ的cluster中。正常情况下,消息应用Instance可以通过ActiveMQ的cluster机制正常通信。如果某个ActiveMQ的node崩溃后,client会自动检测到该情况,切换到另一个ActiveMQ的node。

 

由于本系统只采用Queue的消息工作方式,而且消息的传送采用persistent的模式。如果一个node崩溃后,重新启动后,保存的消息还可以重新发送到Consumer。对Broker,就不采用Master/Slave的HA模式,避免增加系统的复杂性和降低系统的性能。

 

配置

ActiveMQ的Broker的配置如下。

 

#Broker 1:

<!– The transport connectors ActiveMQ will listen to –>

<transportConnectors>

<transportConnector name=”openwire” uri=”tcp://0.0.0.0:61616″/>

</transportConnectors>

 

<!–

The store and forward broker networks ActiveMQ will listen to.

We’ll leave it empty as duplex network will be configured by another broker.

–>

<networkConnectors>

</networkConnectors>

 

#Broker 2:

<!– The transport connectors ActiveMQ will listen to –>

<transportConnectors>

<transportConnector name=”openwire” uri=”tcp://0.0.0.0:61616″/>

</transportConnectors>

 

<!–

The store and forward broker networks ActiveMQ will listen to

Create a duplex connector to the first broker

–>

<networkConnectors>

<networkConnector uri=”static:(tcp://{Broker1Ip}:61616)” duplex=”true”/>

</networkConnectors>

版权声明:本文为freebird2018原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/freebird2018/p/8907266.html