Spring Cloud Stream 入门指南
本指南将带您概览 Spring Cloud Stream,并介绍如何创建一个事件驱动的流式应用程序。
从 Spring Initializr 开始
您可以使用这个预初始化项目,然后点击生成以下载ZIP文件。该项目已配置为适合本教程中的示例。
要手动初始化项目:
-
访问 https://start.spring.io。该服务会拉取应用程序所需的所有依赖项,并为您完成大部分设置工作。
-
选择 Gradle 或 Maven 以及您想要使用的语言。本指南假设您选择了 Java。
-
点击 Dependencies,然后选择 Cloud Stream 和 Spring for Rabbit MQ。
-
点击 Generate。
-
下载生成的 ZIP 文件,这是一个根据您的选择配置好的任务应用程序的归档文件。
如果您的 IDE 集成了 Spring Initializr,您可以直接在 IDE 中完成这个过程。
您还可以从 GitHub 上 fork 该项目,并在您的 IDE 或其他编辑器中打开它。
什么是 Spring Cloud Stream?
一个用于构建事件驱动的 Spring Boot 微服务的框架,专为实时流处理而设计。您可以通过项目网站、文档和示例了解更多关于该框架的信息。
流处理应用程序概览
本指南展示了 Spring Cloud Stream 的功能。我们创建了三个应用程序来展示 Spring Cloud Stream 的不同功能。
-
name-source: 定期发布一个字符串以启动流。在此示例中,发布一个名称作为字符串。
-
name-processor: 消费由 name-source 发布的字符串,并以某种方式转换数据。将结果发布到不同的交换器。在此示例中,使用 name 字符串创建记录并添加时间戳。
-
name-sink: 消费来自 name-processor 的结果并执行操作。在此情况下,将结果打印到标准输出。
在本示例中,应用程序的名称遵循 Spring Cloud Stream 的概念(Source
、Processor
、Sink
)。这些概念分别映射到 Java 8 函数的逻辑等价物(Supplier
、Function
、Consumer
)。虽然 Spring Cloud Stream 可以在 Source 和 Sink 中支持一个或多个 Function 实例(通过函数组合),但我们在这里使用了三个独立的应用程序,以展示每个应用程序如何作为独立的应用运行。
在本指南中,我们从后向前进行。也就是说,我们首先构建 Sink 应用程序,接着是 Processor,最后是 Source。在构建过程中,我们使用 RabbitMQ 仪表盘 UI 来测试每个组件。
安装先决条件 - RabbitMQ
要使用 Spring Cloud Stream 功能,我们需要确保消息代理是可访问的。在本指南中,我们使用 RabbitMQ。如果找到本地 Docker 环境,可以使用以下命令启动 RabbitMQ:
结果是 RabbitMQ 应该可以在本地通过用户名/密码 guest/guest
进行访问。
Sink 应用程序
sink (java.util.function.Consumer
) 在 NameSinkConfiguration
中定义为:
在未配置交换名称的情况下启动此应用程序时,RabbitMQ 会自动生成一个名为 nameSink-in-0
的交换器。我们希望自定义此交换器,以便稍后可以将我们的处理器连接到接收器。
为了测试这个接收器,我们手动发布一个表示 Person
记录的 JSON 消息到新生成的交换器:
此外,在我们的应用程序日志中,我们应该看到数据输出:
处理器应用程序
处理器(java.util.function.Function
)在 NameProcessorConfiguration
中定义为:
该函数接受一个字符串值作为输入,并创建一个新的 Person 记录,该记录添加了数据处理时的时间戳。运行此应用程序会在 RabbitMQ 中创建两个新的交换器:processName-in-0
和 processName-out-0
。与我们应用于接收应用程序的配置类似,我们希望更改这些交换器的名称,以便它们能够连接到接收器,并很快连接到供应器。
处理器的输出与接收器的输入相匹配。
使用 RabbitMQ 控制面板,我们现在可以向处理器的输入交换器发送一个字符串(名称),并观察它如何流向连接的接收器。
如果处理器和接收器连接正确,您应该会看到正在运行的接收器的输出:
源应用程序
Source (java.util.function.Supplier
) 在 NameSourceConfiguration
中定义为:
类似于我们将处理器输出连接到接收器输入的方式,我们也必须将源输出连接到处理器输入。
源头的输出应当与处理器的输入相匹配。
如果 name-processor
和 name-sink
已经在运行,启动 name-source
会立即让消息开始流经系统。您应该会看到 name-sink
持续生成的相同名称,但每条消息通过处理器时的时间戳略有不同。不再需要 RabbitMQ 仪表板测试了!您现在拥有了一个功能齐全的流式应用程序。
总结
恭喜!您已经完成了 Spring Cloud Stream 的高级概述,并且能够构建和测试与 RabbitMQ 通信的 Spring Cloud Stream 应用程序。