Spring Boot 与 Docker
观察 GraphQL 的实际运行

Spring Cloud Stream 入门指南

本指南将带您概览 Spring Cloud Stream,并介绍如何创建一个事件驱动的流式应用程序。

从 Spring Initializr 开始

您可以使用这个预初始化项目,然后点击生成以下载ZIP文件。该项目已配置为适合本教程中的示例。

要手动初始化项目:

  1. 访问 https://start.spring.io。该服务会拉取应用程序所需的所有依赖项,并为您完成大部分设置工作。

  2. 选择 Gradle 或 Maven 以及您想要使用的语言。本指南假设您选择了 Java。

  3. 点击 Dependencies,然后选择 Cloud StreamSpring for Rabbit MQ

  4. 点击 Generate

  5. 下载生成的 ZIP 文件,这是一个根据您的选择配置好的任务应用程序的归档文件。

如果您的 IDE 集成了 Spring Initializr,您可以直接在 IDE 中完成这个过程。

您还可以从 GitHub 上 fork 该项目,并在您的 IDE 或其他编辑器中打开它。

什么是 Spring Cloud Stream?

一个用于构建事件驱动的 Spring Boot 微服务的框架,专为实时流处理而设计。您可以通过项目网站文档示例了解更多关于该框架的信息。

流处理应用程序概览

本指南展示了 Spring Cloud Stream 的功能。我们创建了三个应用程序来展示 Spring Cloud Stream 的不同功能。

  • name-source: 定期发布一个字符串以启动流。在此示例中,发布一个名称作为字符串。

  • name-processor: 消费由 name-source 发布的字符串,并以某种方式转换数据。将结果发布到不同的交换器。在此示例中,使用 name 字符串创建记录并添加时间戳。

  • name-sink: 消费来自 name-processor 的结果并执行操作。在此情况下,将结果打印到标准输出。

在本示例中,应用程序的名称遵循 Spring Cloud Stream 的概念(SourceProcessorSink)。这些概念分别映射到 Java 8 函数的逻辑等价物(SupplierFunctionConsumer)。虽然 Spring Cloud Stream 可以在 Source 和 Sink 中支持一个或多个 Function 实例(通过函数组合),但我们在这里使用了三个独立的应用程序,以展示每个应用程序如何作为独立的应用运行。

流式应用程序分层架构

在本指南中,我们从后向前进行。也就是说,我们首先构建 Sink 应用程序,接着是 Processor,最后是 Source。在构建过程中,我们使用 RabbitMQ 仪表盘 UI 来测试每个组件。

安装先决条件 - RabbitMQ

要使用 Spring Cloud Stream 功能,我们需要确保消息代理是可访问的。在本指南中,我们使用 RabbitMQ。如果找到本地 Docker 环境,可以使用以下命令启动 RabbitMQ:

docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
Plain text

结果是 RabbitMQ 应该可以在本地通过用户名/密码 guest/guest 进行访问。

Sink 应用程序

sink (java.util.function.Consumer) 在 NameSinkConfiguration 中定义为:

@Bean
public Consumer<Person> nameSink() {
  return person -> {
    System.out.println(person.name());
    System.out.println(person.processedTimestamp());
  };
}
Plain text

在未配置交换名称的情况下启动此应用程序时,RabbitMQ 会自动生成一个名为 nameSink-in-0 的交换器。我们希望自定义此交换器,以便稍后可以将我们的处理器连接到接收器。

spring.cloud.stream.function.bindings.nameSink-in-0=sinkinput
Plain text

为了测试这个接收器,我们手动发布一个表示 Person 记录的 JSON 消息到新生成的交换器:

rabbitmq sink

此外,在我们的应用程序日志中,我们应该看到数据输出:

output sink

处理器应用程序

处理器(java.util.function.Function)在 NameProcessorConfiguration 中定义为:

@Bean
public Function<String, Person> processName() {
  return name -> new Person(name, new Date().getTime());
}
Plain text

该函数接受一个字符串值作为输入,并创建一个新的 Person 记录,该记录添加了数据处理时的时间戳。运行此应用程序会在 RabbitMQ 中创建两个新的交换器:processName-in-0processName-out-0。与我们应用于接收应用程序的配置类似,我们希望更改这些交换器的名称,以便它们能够连接到接收器,并很快连接到供应器。

spring.cloud.stream.function.bindings.processName-in-0=processorinput
spring.cloud.stream.function.bindings.processName-out-0=sinkinput
Plain text

处理器的输出与接收器的输入相匹配。

使用 RabbitMQ 控制面板,我们现在可以向处理器的输入交换器发送一个字符串(名称),并观察它如何流向连接的接收器。

rabbitmq processor

如果处理器和接收器连接正确,您应该会看到正在运行的接收器的输出:

output processor sink

源应用程序

Source (java.util.function.Supplier) 在 NameSourceConfiguration 中定义为:

@Bean
public Supplier<String> supplyName() {
  return () -> "Christopher Pike";
}
Plain text

类似于我们将处理器输出连接到接收器输入的方式,我们也必须将源输出连接到处理器输入。

spring.cloud.stream.function.bindings.supplyName-out-0=processorinput
Plain text

源头的输出应当与处理器的输入相匹配。

如果 name-processorname-sink 已经在运行,启动 name-source 会立即让消息开始流经系统。您应该会看到 name-sink 持续生成的相同名称,但每条消息通过处理器时的时间戳略有不同。不再需要 RabbitMQ 仪表板测试了!您现在拥有了一个功能齐全的流式应用程序。

总结

恭喜!您已经完成了 Spring Cloud Stream 的高级概述,并且能够构建和测试与 RabbitMQ 通信的 Spring Cloud Stream 应用程序。