一般在稍微大一点的项目中,需要配置多个数据库数据源,最简单的方式是用 Spring 来实现,只需要继承 AbstractRoutingDataSource 类,实现 determineCurrentLookupKey 方法,再配合使用 ThreadLocal 就可以实现。

但是如何实现 MQ 的多数据源呢?假设有部署在不同服务器上的两个消息队列,或者是同一服务器,不同 vhost 的消息队列,在一个项目中,我如何自由地选择从哪个队列收发消息呢?下面说说用 Spring AMQP + Rabbit 的实现过程及踩过的坑。

最开始的单数据源的实现很简单,网上有好多博文可以参考,官网也有介绍。主要就是创建一个 xml 的配置文件,添加各种必要的配置,声明 connection-factory、rabbitListenerContainerFactory、rabbitTemplate、queue、exchange、binding 等等。然后用 RabbitTemplate 来发消息,用 @RabbitListener 注解来监听,用 queue 指定队列来收消息,这里就不赘述了。主要说一下,在现有的基础上实现多数据源的收发。

先说配置方面,为了对比,下面先给出单数据源配置:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  5. xsi:schemaLocation="http://www.springframework.org/schema/beans
  6. http://www.springframework.org/schema/beans/spring-beans.xsd
  7. http://www.springframework.org/schema/rabbit
  8. http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
  9. <rabbit:connection-factory id="rabbitConnectionFactory" host="${rabbit.host}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}"
  10.        requested-heartbeat="30" virtual-host="${rabbit.vhost}" channel-cache-size="50"/>
  11.  
  12. <bean id="rabbitListenerContainerFactory"
  13. class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
  14. <property name="connectionFactory" ref="rabbitConnectionFactory"/>
  15. <property name="concurrentConsumers" value="16"/>
  16. <property name="maxConcurrentConsumers" value="50"/>
  17. </bean>
  18. <rabbit:admin id="rabbitAdmin" connection-factory="rabbitConnectionFactory"/>
  19. <!-- queue declare -->
  20. <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="queue.test"/>
  21. <!-- bind queue to exchange -->
  22. <rabbit:direct-exchange name="exchange" auto-delete="false" durable="true">
  23. <rabbit:bindings>
  24. <rabbit:binding queue="queue.test" key="rkey.test"></rabbit:binding>
  25. </rabbit:bindings>
  26. </rabbit:direct-exchange>
  27.  
  28. <rabbit:template id="rabbitTemplate" connection-factory="rabbitConnectionFactory" retry-template="retryTemplate" reply-timeout="60000"/>
  29.  
  30. <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
  31. <property name="backOffPolicy">
  32. <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
  33. <property name="initialInterval" value="500"/>
  34. <property name="multiplier" value="10.0"/>
  35. <property name="maxInterval" value="10000"/>
  36. </bean>
  37. </property>
  38. </bean>
  39. </beans>

为了实现双数据源,查阅了很多资料,最初实现的配置如下:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  5. xsi:schemaLocation="http://www.springframework.org/schema/beans
  6. http://www.springframework.org/schema/beans/spring-beans.xsd
  7. http://www.springframework.org/schema/rabbit
  8. http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
  9. <rabbit:connection-factory id="rabbitConnectionFactory" host="${rabbit.host}" port="${rabbit.port}" username="${rabbit.username}"
  10. password="${rabbit.password}" requested-heartbeat="30" virtual-host="${rabbit.vhost}" channel-cache-size="50"/>
  11. <!-- 添加了一个连接工厂,参数从 properties 文件中取 -->
  12. <rabbit:connection-factory id="rabbitConnectionFactory1" host="${rabbit.host1}" port="${rabbit.port1}" username="${rabbit.username1}"
  13.        password="${rabbit.password1}" requested-heartbeat="30" virtual-host="${rabbit.vhost1}" channel-cache-size="50"/>
  14.  
  15. <!-- 添加 SimpleRoutingConnectionFactory 配置,将两个 Connection factory 配置好-->
  16. <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
  17. <property name="targetConnectionFactories">
  18. <map>
  19. <entry key="rabbitConnectionFactory" value-ref="rabbitConnectionFactory"/>
  20. <entry key="rabbitConnectionFactory1" value-ref="rabbitConnectionFactory1"/>
  21. </map>
  22. </property>
  23. </bean>
  24. <rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory"/>
  25.  
  26. <!-- 由于增加了一个连接工厂,ContainerFactory 的连接工厂改为新增的 ConnectionFactory -->
  27. <bean id="rabbitListenerContainerFactory"
  28. class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
  29. <!-- <property name="connectionFactory" ref="rabbitConnectionFactory"/> -->
  30. <property name="connectionFactory" ref="connectionFactory"/>
  31. <property name="concurrentConsumers" value="16"/>
  32. <property name="maxConcurrentConsumers" value="50"/>
  33. </bean>
  34.  
  35. <!-- queue declare,增加一个消息队列 -->
  36. <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="queue.test"/>
  37. <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="queue.test1"/>
  38. <!-- bind queue to exchange -->
  39. <rabbit:direct-exchange name="exchange" auto-delete="false" durable="true">
  40. <rabbit:bindings>
  41. <rabbit:binding queue="queue.test" key="rkey.test"></rabbit:binding>
  42. <rabbit:binding queue="queue.test1" key="rkey.test1"></rabbit:binding>
  43. </rabbit:bindings>
  44. </rabbit:direct-exchange>
  45.  
  46. <!-- connection-factory 改为新增的 ConnectionFactory -->
  47. <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" retry-template="retryTemplate" reply-timeout="60000"/>
  48.  
  49. <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
  50. <property name="backOffPolicy">
  51. <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
  52. <property name="initialInterval" value="500"/>
  53. <property name="multiplier" value="10.0"/>
  54. <property name="maxInterval" value="10000"/>
  55. </bean>
  56. </property>
  57. </bean>
  58. </beans>

改动都写在注释里了,主要就是增加了一个连接工厂的配置,其他配置做了一些相应的适配。

发消息的时候,需要指定连接工厂,也就是说,你要往哪个消息服务器发:

  1. @Test
  2. public void testSendMsg() {
  3. SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), "rabbitConnectionFactory");
  4. rabbitTemplate.convertAndSend("exchange", "rkey.test", "test");
  5. SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
  6. SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), "rabbitConnectionFactory1");
  7. rabbitTemplate.convertAndSend("exchange", "rkey.test1", "test1");
  8. SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
  9. }

在发消息之前调用 SimpleResourceHolder.bind 绑定要使用的工厂,发完之后,调用 unbind 解除绑定。将上述代码封装为两个工具类,更好。

然后,有一个大坑在前面。。。如何收消息?

发消息要绑定连接工厂,指明往哪个消息服务器上发,收的时候,同样得指定要从哪个消息服务器上收。最开始没想到这点,以为只要指定队列名称就可以,如下:

  1. @RabbitListener(queues = "queue.test")
  2. public void receiveMsg(Message message) {
  3. String msg = new String(message.getBody());
  4. System.out.println(msg);
  5. }

然并卵,报了异常:

  1. java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]
  2. at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.determineTargetConnectionFactory(AbstractRoutingConnectionFactory.java:116) ~[spring-rabbit-1.5.1.RELEASE.jar:na]
  3. at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.createConnection(AbstractRoutingConnectionFactory.java:94) ~[spring-rabbit-1.5.1.RELEASE.jar:na]
  4. at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createConnection(ConnectionFactoryUtils.java:80) ~[spring-rabbit-1.5.1.RELEASE.jar:na]
  5. at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:130) ~[spring-rabbit-1.5.1.RELEASE.jar:na]
  6. at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67) ~[spring-rabbit-1.5.1.RELEASE.jar:na]
  7. at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:456) ~[spring-rabbit-1.5.1.RELEASE.jar:na]
  8. at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1158) ~[spring-rabbit-1.5.1.RELEASE.jar:na]
  9. at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]

这个问题不好解决,查了很多资料都没用,比如这种方式:https://stackoverflow.com/questions/42784471/spring-amqp-mix-simpleroutingconnectionfactory-with-rabbitlistener  。

无奈之下,只能试着看看 Spring 的 AMQP 怎么实现,看看有没有解决的办法,最开始想的是继承 Spring 的某个类来实现。然而,看来看去,很是头大,没有结果。

最后无意间点到了 @RabbitListener 这个注解中,发现了有一个属性,瞬间感觉很兴奋,如下图:

看了下注释,这里可以指定一个 containerFactory,感觉可以试试。首先只有一个 containerFactory,那就加一个吧。为了看的比较清晰,我把第一次添加的注释去掉了,于是配置成了这样:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  5. xsi:schemaLocation="http://www.springframework.org/schema/beans
  6. http://www.springframework.org/schema/beans/spring-beans.xsd
  7. http://www.springframework.org/schema/rabbit
  8. http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
  9. <rabbit:connection-factory id="rabbitConnectionFactory" host="${rabbit.host}" port="${rabbit.port}" username="${rabbit.username}"
  10. password="${rabbit.password}" requested-heartbeat="30" virtual-host="${rabbit.vhost}" channel-cache-size="50"/>
  11. <rabbit:connection-factory id="rabbitConnectionFactory1" host="${rabbit.host1}" port="${rabbit.port1}" username="${rabbit.username1}"
  12.        password="${rabbit.password1}" requested-heartbeat="30" virtual-host="${rabbit.vhost1}" channel-cache-size="50"/>
  13.  
  14. <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
  15. <property name="targetConnectionFactories">
  16. <map>
  17. <entry key="rabbitConnectionFactory" value-ref="rabbitConnectionFactory"/>
  18. <entry key="rabbitConnectionFactory1" value-ref="rabbitConnectionFactory1"/>
  19. </map>
  20. </property>
  21. </bean>
  22. <rabbit:admin id="rabbitAdmin" connection-factory="rabbitConnectionFactory"/>
  23. <!-- 添加一个 rabbitAdmin-->
  24. <rabbit:admin id="rabbitAdmin1" connection-factory="rabbitConnectionFactory1"/>
  25.  
  26. <!-- 把原有的 ContainerFactory 的连接工厂改为 rabbitConnectionFactory-->
  27. <bean id="rabbitListenerContainerFactory"
  28. class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
  29. <property name="connectionFactory" ref="rabbitConnectionFactory"/>
  30. <property name="concurrentConsumers" value="16"/>
  31. <property name="maxConcurrentConsumers" value="50"/>
  32. </bean>
  33. <!-- 添加一个 ContainerFactory, 连接工厂为 rabbitConnectionFactory1-->
  34. <bean id="rabbitListenerContainerFactory1"
  35. class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
  36. <property name="connectionFactory" ref="rabbitConnectionFactory1"/>
  37. <property name="concurrentConsumers" value="16"/>
  38. <property name="maxConcurrentConsumers" value="50"/>
  39. </bean>
  40.  
  41. <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="queue.test"/>
  42. <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="queue.test1"/>
  43. <rabbit:direct-exchange name="exchange" auto-delete="false" durable="true">
  44. <rabbit:bindings>
  45. <rabbit:binding queue="queue.test" key="rkey.test"></rabbit:binding>
  46. <rabbit:binding queue="queue.test1" key="rkey.test1"></rabbit:binding>
  47. </rabbit:bindings>
  48. </rabbit:direct-exchange>
  49.  
  50. <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" retry-template="retryTemplate" reply-timeout="60000"/>
  51.  
  52. <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
  53. <property name="backOffPolicy">
  54. <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
  55. <property name="initialInterval" value="500"/>
  56. <property name="multiplier" value="10.0"/>
  57. <property name="maxInterval" value="10000"/>
  58. </bean>
  59. </property>
  60. </bean>
  61. </beans>

收消息的时候指定 container factory 即可:

  1. @RabbitListener(queues = "queue.test", containerFactory = "rabbitListenerContainerFactory")
  2. public void receiveMsg(Message message) {
  3. String msg = new String(message.getBody());
  4. System.out.println(msg);
  5. }

测试通过!

以上配置、解决办法是尝试过多次以后得出的,所以还是要有耐心,多尝试。

由于在网上没有找到解决办法,只有自己摸索着解决,如果大家有其他解决方案,欢迎留言讨论!

 

版权声明:本文为HarrisonHao原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:http://www.cnblogs.com/HarrisonHao/p/7884925.html