消息驱动式微服务:Spring Cloud Stream & RabbitMQ
1. 概述
在本文中,我们将向您介绍Spring Cloud Stream
,这是一个用于构建消息驱动的微服务应用程序的框架,这些应用程序由一个常见的消息传递代理(如RabbitMQ
、Apache Kafka
等)连接。
Spring Cloud Stream
构建在现有Spring框架(如Spring Messaging
和Spring Integration
)之上。尽管这些框架经过了实战测试,工作得非常好,但是实现与使用的message broker
紧密耦合。此外,有时对某些用例进行扩展是困难的。
Spring Cloud Stream
背后的想法是一个非常典型的Spring Boot
概念——抽象地讲,让Spring根据配置和依赖关系管理在运行时找出实现自动注入
。这意味着您可以通过更改依赖项和配置文件来更改message broker
。可以在这里找到目前已经支持的各种消息代理。
本文将使用RabbitMQ
作为message broker
。在此之前,让我们了解一下broker
(代理)的一些基本概念,以及为什么要在面向微服务的体系架构中需要它。
2. 微服务中的消息
在微服务体系架构中,我们有许多相互通信以完成请求的小型应用程序—它们的主要优点之一是改进了的可伸缩性。一个请求从多个下游微服务传递到完成是很常见的。例如,假设我们有一个Service-A
内部调用Service-B
和Service-C
来完成一个请求:
是的,还会有其他组件,比如Spring Cloud Eureka
、Spring Cloud Zuul
等等,但我们还是专注关心这类架构的特有问题。
假设由于某种原因Service-B
需要更多的时间来响应。也许它正在执行I/O操作
或长时间的DB事务
,或者进一步调用其它导致Service-B
变得更慢的服务,这些都使其无法更具效率。
现在,我们可以启动更多的Service-B
实例来解决这个问题,这样很好,但是Service-A
实际上是响应很快的,它需要等待Service-B
的响应来进一步处理。这将导致Service-A
无法接收更多的请求,这意味着我们还必须启动Service-A
的多个实例。
另一种方法解决类似情况的是使用事件驱动的微服务体系架构。这基本上意味着Service-A
不直接通过HTTP
调用Service-B
或Service-C
,而是将请求或事件发布给message broker
(消息代理)。Service-B
和Service-C
将成为message broker
(消息代理)上此事件的订阅者。
与依赖HTTP调用的传统微服务体系架构相比,这有许多优点:
- 提高可伸缩性和可靠性——现在我们知道哪些服务是整个应用程序中的真正瓶颈。
- 鼓励松散耦合——
Service-A
不需要了解Service-B
和Service-C
。它只需要连接到message broker
并发布事件。事件如何进一步编排取决于代理设置。通过这种方式,Service-A
可以独立地运行,这是微服务的核心概念之一。 - 与遗留系统交互——通常我们不能将所有东西都移动到一个新的技术堆栈中。我们仍然必须使用遗留系统,虽然速度很慢,但是很可靠。
3. RabbitMQ
高级消息队列协议(AMQP)
是RabbitMQ
用于消息传递的协议。虽然RabbitMQ
支持其他一些协议,但是AMQP
由于兼容性和它提供的大量特性而更受欢迎。
3.1 RabbitMQ架构设计
因此发布者将消息发布到RabbitMQ
中称为Exchange
(交换器)。Exchange
(交换器)接收消息并将其路由到一个或多个Queues
(队列)。路由算法依赖于Exchange
(交换器)类型和routing
(路由)key/header(与消息一起传递)。将Exchange
(交换器)连接到Queues
(队列)的这些规则称为bindings
(绑定)。
绑定可以有4种类型:
-
Direct: 它根据
routing key
(路由键)将Exchange
(交换器)类型直接路由到特定的Queues
(队列)。 -
Fanout:它将消息路由到绑定
Exchange
(交换器)中的所有Queues
(队列)。 -
Topic:它根据完全匹配或部分据
routing key
(路由键)匹配将消息路由到(0、1或更多)的Queues
(队列)。 -
Headers:它类似于
Topic
(主题)交换类型,但是它是基routing header
(路由头)而不是routing key
(路由键)来路由的。
通过Exchange
(交换器)和Queues
(队列)发布和消费消息的整个过程是通过一个Channel
(通道)完成的。
有关路由的详细信息,请访问此链接。
3.2 RabbitMQ 设置
3.2.1 安装
我们可以从这里下载并安装基于我们的操作系统的二进制文件。
然而,在本文中,我们将使用cloudamqp.com
提供的免费云安装。只需注册服务并登录即可。
在主仪表板上单击创建新实例
:
然后给你的实例起个名字,然后进入下一步:
然后选择一个可用区:
最后,查看实例信息,点击右下角的创建实例
:
就是这样。现在在云中运行了一个RabbitMQ
实例。有关实例的更多信息,请转到您的仪表板并单击新创建的实例
:
我们可以看到我们可以访问RabbitMQ实例的主机,比如从我们的项目连接所需的用户名和密码:
我们将在Spring应用程序中使用AMQP URL
连接到这个实例,所以请在某个地方记下它。
您还可以通过单击左上角的RabbitMQ manager
来查看管理器控制台。这将采用它来管理的您的RabbitMQ
实例。
Project 配置
现在我们的设置已经准备好了,让我们创建我们的服务:
- cloud-stream-producer-rabbitmq: 作为一个发布者,将消息推送到
RabbitMQ
- cloud-stream-consumer-rabbitmq: 消费者消费消息
使用Spring Initializr
创建一个脚手架项目。这将是我们的producer
项目,我们将使用REST
端点发布消息。
选择您喜欢的Spring Boot
版本,添加Web
和Cloud Stream
依赖项,生成Maven
项目:
注意:
请注意cloud-stream
依赖项。这也需要像RabbitMQ
、Kafka
等绑定器依赖项才能工作。
由于我们将使用RabbitMQ
,添加以下Maven
依赖项:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
或者,我们也可以将两者结合起来使用spring-cloud-starter-stream-rabbit
:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
使用同样的方法,创建消费者项目,但仅使用spring-cloud-starter-stream-rabbit
依赖项。
4. 创建生产者
如前所述,将消息从发布者传递到队列的整个过程是通过通道完成的。因此,让我们创建一个HelloBinding
接口,其中包含我们的消息机制greetingChannel
:
interface HelloBinding {
@Output("greetingChannel")
MessageChannel greeting();
}
因为这将发布消息,所以我们使用@Output
注解。方法名可以是我们想要的任意名称,当然,我们可以在一个接口中有多个Channel
(通道)。
现在,让我们创建一个REST
,它将消息推送到这个Channel
(通道)
@RestController
public class ProducerController {
private MessageChannel greet;
public ProducerController(HelloBinding binding) {
greet = binding.greeting();
}
@GetMapping("/greet/{name}")
public void publish(@PathVariable String name) {
String greeting = "Hello, " + name + "!";
Message<String> msg = MessageBuilder.withPayload(greeting)
.build();
this.greet.send(msg);
}
}
上面,我们创建了一个ProducerController
类,它有一个MessageChannel
类型的属性 greet
。这是通过我们前面声明的方法在构造函数中初始化的。
注意: 我们可以用简洁的方式做同样的事情,但是我们使用不同的名称来让您更清楚地了解事物是如何连接的。
然后,我们有一个简单的REST
接口,它接收PathVariable
的name
,并使用MessageBuilder
创建一个String
类型的消息。最后,我们使用MessageChannel
上的.send()
方法来发布消息。
现在,我们将在的主类中添加@EnableBinding
注解,传入HelloBinding
告诉Spring
加载。
@EnableBinding(HelloBinding.class)
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
最后,我们必须告诉Spring
如何连接到RabbitMQ
(通过前面的AMQP URL
),并将greetingChannel
连接到一可用的消费者。
这两个都是在application.properties
中定义的:
spring.rabbitmq.addresses=<amqp url>
spring.cloud.stream.bindings.greetingChannel.destination = greetings
server.port=8080
5. 创建消费者
现在,我们需要监听之前创建的通道greetingChannel
。让我们为它创建一个绑定:
public interface HelloBinding {
String GREETING = "greetingChannel";
@Input(GREETING)
SubscribableChannel greeting();
}
与生产者绑定的两个非常明显区别。因为我们正在消费消息,所以我们使用SubscribableChannel
和@Input
注解连接到greetingChannel
,消息数据将被推送这里。
现在,让我们创建处理数据的方法:
@EnableBinding(HelloBinding.class)
public class HelloListener {
@StreamListener(target = HelloBinding.GREETING)
public void processHelloChannelGreeting(String msg) {
System.out.println(msg);
}
}
在这里,我们创建了一个HelloListener
类,在processHelloChannelGreeting
方法上添加@StreamListener
注解。这个方法需要一个字符串作为参数,我们刚刚在控制台打印了这个参数。我们还在类添加@EnableBinding
启用了HelloBinding
。
同样,我们在这里使用@EnableBinding
,而不是主类,以便告诉我们如何使用。
看看我们的主类,我们没有任何修改:
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
在application.properties
配置文件中,我们需要定义与生产者一样的属性,除了修改端口之外
spring.rabbitmq.addresses=<amqp url>
spring.cloud.stream.bindings.greetingChannel.destination=greetings
server.port=9090
6. 全部测试
让我们同时启动生产者和消费者服务。首先,让我们通过点击端点http://localhost:8080/greet/john
来生产消息。
在消费者日志中看到消息内容:
我们使用以下命令启动另一个消费者服务实例(在另一个端口(9091)上):
$ mvn spring-boot:run -Dserver.port=9091
现在,当我们点击生产者REST
端点生产消息时,我们看到两个消费者都收到了消息:
这可能是我们在一些用例中想要的。但是,如果我们只想让一个消费者消费一条消息呢?为此,我们需要在application.properties
中创建一个消费者组。消费者的配置文件:
spring.cloud.stream.bindings.greetingChannel.group = greetings-group
现在,再次在不同的端口上运行消费者的2个实例,并通过生产者生产消息再次查看:
这一切也可以在RabbitMQ
管理器控制台看到:
7. 结论
在本文中,我们解释了消息传递的主要概念、它在微服务中的角色以及如何使用Spring Cloud Stream
实现它。我们使用RabbitMQ
作为消息代理,但是我们也可以使用其他流行的代理,比如Kafka
,只需更改配置和依赖项。
与往常一样,本文使用的示例代码可以在GitHub获得完整的源代码。
原文:stackabuse.com/spring-clou…
译者:李东