如何将消息发送到 SCDF 中的两个不同输出通道?

How to send messages to two different output channels in SCDF?

我有一个源通过默认 output 通道将消息发送到流中的处理器。现在我想通过不同的渠道发送失败消息。

我想我应该创建一个可绑定的接口,它从 Source 扩展并使用 @Output 添加额外的通道。我如何确保 SCDF 实际上为这个频道创建了一个 Kafka 主题? IOW,流定义是什么样的?

例如类似

的东西

source | processor | sink source > error-sink

source | processor 使用常规 output channel/Kafka 主题,source > error-sink 使用不同的 channel/topic。

如果需要跟踪下游处理的错误消息,您可以使用与 Spring Cloud Stream 关​​联的 OOTB DLQ 机制。 Rabbit and Kafka. You could enable DLQ in Spring Cloud Data Flow (SCDF) as a global setting 或每个流都支持它。

如果您仍然想要定义您的自定义渠道来以不同方式处理消息,您必须创建一个类似于此 sample 的自定义界面。

在 SCDF 中部署流时,您可以分别通过 spring.cloud.stream.kafka.bindings.<channelName>.producerspring.cloud.stream.kafka.bindings.<channelName>.consumer 绑定属性覆盖生产者和消费者之间的目的地。

编辑:

虽然有上述方法,但我从 Spring Cloud Stream 负责人 (@marius-bogoevici) 那里了解到了一个更简单的解决方案。

已经有一个可供使用的默认错误通道,并且 Spring 集成支持它。

有了这个,在您的应用程序中,您可以通过以下方式将自定义消息发送到默认错误通道:@Autowire @Qualifier("errorChannel")。事实上,此支持适用于所有 OOTB 应用程序。

然后您可以通过以下方式覆盖此错误通道的目标:spring.cloud.stream.bindings.error.destination=errorchannel-test。在 SCDF 中,您将在流部署时通过以下方式传递此信息:--properties

例如:

stream create foo --definition "mysource | log"

stream deploy foo --properties "app.mysource.spring.cloud.stream.bindings.error.destination=errorchannel-test"