本指南将引导您完成创建一个 Spring Boot 应用程序的过程,该应用程序能够发布和订阅 RabbitMQ AMQP 服务器。
你将构建什么
您将构建一个应用程序,该应用程序使用 Spring AMQP 的 RabbitTemplate
发布消息,并通过使用 MessageListenerAdapter
在 POJO 上订阅该消息。
所需条件
-
大约15分钟
-
一个喜欢的文本编辑器或IDE
-
Java 17 或更高版本
如何完成本指南
与大多数 Spring 入门指南 一样,您可以从头开始并完成每个步骤,也可以通过查看 此仓库 中的代码直接跳转到解决方案。
要在本地环境中查看最终结果,您可以执行以下操作之一:
-
下载并解压缩本指南的源码仓库
-
使用 Git 克隆仓库:
git clone https://github.com/spring-guides/gs-messaging-rabbitmq.git
-
创建仓库的分支,这样您可以通过提交拉取请求来请求对本指南的更改
配置 RabbitMQ Broker
在构建您的消息应用程序之前,您需要设置一个服务器来处理接收和发送消息。本指南假定您使用 Spring Boot Docker Compose 支持。此方法的前提是您的开发机器上有一个 Docker 环境,例如 Docker Desktop。添加一个 spring-boot-docker-compose
依赖项,该依赖项将执行以下操作:
- 在工作目录中搜索
compose.yml
和其他常见的 compose 文件名 - 使用发现的
compose.yml
调用docker compose up
- 为每个受支持的容器创建服务连接 bean
- 在应用程序关闭时调用
docker compose stop
要使用 Docker Compose 支持,您只需按照本指南操作即可。根据您引入的依赖项,Spring Boot 会找到正确的 compose.yml
文件,并在您运行应用程序时启动您的 Docker 容器。
如果您选择自己运行 RabbitMQ 服务器而不是使用 Spring Boot 的 Docker Compose 支持,您有几个选项:
- 下载服务器并手动运行它
- 如果您使用的是 Mac,可以通过 Homebrew 安装
- 使用
docker-compose up
手动运行compose.yaml
文件
如果您选择任何这些替代方案,您应该从 Maven 或 Gradle 构建文件中移除 spring-boot-docker-compose
依赖项。您还需要在 application.properties
文件中添加配置,如准备构建应用程序部分中更详细描述的那样。如前所述,本指南假设您使用 Spring Boot 中的 Docker Compose 支持,因此目前不需要对 application.properties
进行额外的更改。
从 Spring Initializr 开始
您可以使用这个预先初始化的项目并点击生成以下载一个ZIP文件。该项目已配置为适合本指南中的示例。
要手动初始化项目:
-
访问 start.spring.io。该服务会拉取应用程序所需的所有依赖项,并为您完成大部分设置工作。
-
选择 Gradle 或 Maven 以及您想要使用的语言。本指南假设您选择了 Java。
-
点击 Dependencies,然后选择 Spring for RabbitMQ 和 Docker Compose Support。
-
点击 Generate。
-
下载生成的 ZIP 文件,该文件是根据您的选择配置的应用程序存档。
如果您的 IDE 集成了 Spring Initializr,您可以直接在 IDE 中完成这一过程。
创建 RabbitMQ 消息接收器
在任何基于消息传递的应用程序中,您需要创建一个接收器来响应发布的消息。以下代码清单(来自 src/main/java/com/example/messagingrabbitmq/Receiver.java
)展示了如何实现这一功能:
package com.example.messagingrabbitmq;
import java.util.concurrent.CountDownLatch;
import org.springframework.stereotype.Component;
@Component
public class Receiver {
private CountDownLatch latch = new CountDownLatch(1);
public void receiveMessage(String message) {
System.out.println("Received <" + message + ">");
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
}
Receiver
是一个 POJO,它定义了接收消息的方法。当您将其注册为接收消息时,可以随意命名。
为了方便起见,这个 POJO 还包含一个
CountDownLatch
。这使它能够发出信号,表示消息已被接收。这通常是您在生产应用程序中不会实现的功能。
注册监听器并发送消息
Spring AMQP 的 RabbitTemplate
提供了您所需的一切来通过 RabbitMQ 发送和接收消息。然而,您需要:
- 配置一个消息监听器容器。
- 声明队列、交换机以及它们之间的绑定关系。
- 配置一个组件来发送一些消息以测试监听器。
Spring Boot 自动创建连接工厂和 RabbitTemplate,减少了您需要编写的代码量。
您将使用 RabbitTemplate
来发送消息,并会在消息监听容器中注册一个 Receiver
来接收消息。连接工厂驱动这两者,使它们能够连接到 RabbitMQ 服务器。以下代码清单(来自 src/main/java/com/example/messagingrabbitmq/MessagingRabbitmqApplication.java
)展示了如何创建应用程序类:
package com.example.messagingrabbitmq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class MessagingRabbitmqApplication {
static final String topicExchangeName = "spring-boot-exchange";
static final String queueName = "spring-boot";
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(topicExchangeName);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(MessagingRabbitmqApplication.class, args).close();
}
}
@SpringBootApplication
注解提供了许多好处,正如参考文档中所描述的那样。
在 listenerAdapter()
方法中定义的 bean 被注册为容器(在 container()
中定义)中的消息监听器。它监听 spring-boot
队列上的消息。由于 Receiver
类是一个 POJO,它需要被包装在 MessageListenerAdapter
中,在其中指定它调用 receiveMessage
。
JMS 队列和 AMQP 队列具有不同的语义。例如,JMS 将队列中的消息只发送给一个消费者。而 AMQP 队列虽然也有类似的行为,但 AMQP 生产者并不直接将消息发送到队列。相反,消息会被发送到一个交换机,该交换机可以将消息路由到单个队列,或者广播到多个队列,从而模拟 JMS 主题的概念。
消息监听器容器和接收器 Bean 是监听消息所需的全部内容。要发送消息,您还需要一个 Rabbit 模板。
queue()
方法用于创建 AMQP 队列。exchange()
方法用于创建主题交换机。binding()
方法将这两者绑定在一起,定义了当 RabbitTemplate
向交换机发布消息时的行为。
Spring AMQP 要求将
Queue
、TopicExchange
和Binding
声明为顶层的 Spring bean,以便正确配置。
在这种情况下,我们使用了一个主题交换机,并且队列绑定了一个路由键 foo.bar.#
,这意味着任何以 foo.bar.
开头的路由键发送的消息都会被路由到该队列。
发送测试消息
在这个示例中,测试消息由一个 CommandLineRunner
发送,它还会等待接收器中的锁存器并关闭应用程序上下文。以下代码清单(来自 src/main/java/com.example.messagingrabbitmq/Runner.java
)展示了其工作原理:
package com.example.messagingrabbitmq;
import java.util.concurrent.TimeUnit;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class Runner implements CommandLineRunner {
private final RabbitTemplate rabbitTemplate;
private final Receiver receiver;
public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) {
this.receiver = receiver;
this.rabbitTemplate = rabbitTemplate;
}
@Override
public void run(String... args) throws Exception {
System.out.println("Sending message...");
rabbitTemplate.convertAndSend(MessagingRabbitmqApplication.topicExchangeName, "foo.bar.baz", "Hello from RabbitMQ!");
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
}
}
请注意,模板将消息路由到交换器时使用的路由键为 foo.bar.baz
,这与绑定匹配。
在测试中,您可以模拟运行器,以便可以单独测试接收器。
运行应用程序
main()
方法通过创建 Spring 应用程序上下文来启动该流程。这启动了消息监听容器,开始监听消息。其中有一个 Runner
bean,它会自动运行。它从应用程序上下文中获取 RabbitTemplate
,并在 spring-boot
队列上发送一条 Hello from RabbitMQ!
消息。最后,它关闭 Spring 应用程序上下文,应用程序结束。
您可以通过 IDE 运行 main
方法。需要注意的是,如果您从解决方案仓库克隆了项目,您的 IDE 可能会在错误的位置查找 compose.yaml
文件。您可以配置 IDE 以查找正确的位置,或者使用命令行运行应用程序。./gradlew bootRun
和 ./mvnw spring-boot:run
命令将启动应用程序并自动找到 compose.yaml
文件。
准备构建应用程序
要在没有 Spring Boot Docker Compose 支持的情况下运行代码,您需要在本地运行一个 RabbitMQ 版本以便连接。为此,您可以使用 Docker Compose,但首先必须对 compose.yaml
文件进行两处修改。首先,将 compose.yaml
中的 ports
条目修改为 '5672:5672'
。其次,添加一个 container_name
。
修改后的 compose.yaml
文件应该如下所示:
services:
rabbitmq:
container_name: 'guide-rabbit'
image: 'rabbitmq:latest'
environment:
* 'RABBITMQ_DEFAULT_PASS=secret'
* 'RABBITMQ_DEFAULT_USER=myuser'
ports:
* '5672:5672'
现在你可以运行 docker-compose up
来启动 RabbitMQ 服务。此时你应该已经拥有一个外部 RabbitMQ 服务器,它已经准备好接收请求了。
此外,你还需要告诉 Spring 如何连接到 RabbitMQ 服务器(在使用 Spring Boot Docker Compose 支持时,这一步是自动处理的)。将以下代码添加到 src/main/resources
目录下的新 application.properties
文件中:
spring.rabbitmq.password=secret
spring.rabbitmq.username=myuser
构建应用程序
本节介绍了运行本指南的不同方法:
-
构建并执行 JAR 文件
-
使用 Cloud Native Buildpacks 构建并执行 Docker 容器
无论您选择如何运行应用程序,输出结果应该是相同的。
要运行应用程序,您可以将应用程序打包为可执行的 jar 文件。./gradlew clean build
命令将应用程序编译为可执行的 jar 文件。然后,您可以使用 java -jar build/libs/messaging-rabbitmq-0.0.1-SNAPSHOT.jar
命令来运行该 jar 文件。
或者,如果您有可用的 Docker 环境,您可以直接使用 buildpacks 从 Maven 或 Gradle 插件创建 Docker 镜像。通过 Cloud Native Buildpacks,您可以创建可在任何地方运行的 Docker 兼容镜像。Spring Boot 直接支持 Maven 和 Gradle 的 buildpack。这意味着您可以键入一个命令,并快速将一个合理的镜像推送到本地运行的 Docker 守护进程中。要使用 Cloud Native Buildpacks 创建 Docker 镜像,请运行 ./gradlew bootBuildImage
命令。在启用 Docker 环境的情况下,您可以使用 docker run --network container:guide-rabbit docker.io/library/messaging-rabbitmq:0.0.1-SNAPSHOT
命令来运行应用程序。
--network
标志告诉 Docker 将我们的指南容器附加到外部容器正在使用的现有网络中。您可以在 Docker 文档 中找到更多信息。
无论您选择如何构建和运行应用程序,您都应该看到以下输出:
Sending message...
Received <Hello from RabbitMQ!>
总结
恭喜!您刚刚使用 Spring 和 RabbitMQ 开发了一个简单的发布订阅应用程序。您可以利用 Spring 和 RabbitMQ 做比这里介绍的更多事情,但本指南应该为您提供了一个良好的开端。
另请参阅
以下指南可能也会有所帮助: