如何将消息发送到 SCDF 中的两个不同输出通道?
How to send messages to two different output channels in SCDF?
我有一个源通过默认 output
通道将消息发送到流中的处理器。现在我想也通过不同的渠道发送失败消息。
我想我应该创建一个可绑定的接口,它从 Source
扩展并使用 @Output
添加额外的通道。我如何确保 SCDF 实际上为这个频道创建了一个 Kafka 主题? IOW,流定义是什么样的?
例如类似
的东西
source | processor | sink
source > error-sink
source | processor
使用常规 output
channel/Kafka 主题,source > error-sink
使用不同的 channel/topic。
如果需要跟踪下游处理的错误消息,您可以使用与 Spring Cloud Stream 关联的 OOTB DLQ 机制。 Rabbit and Kafka. You could enable DLQ in Spring Cloud Data Flow (SCDF) as a global setting 或每个流都支持它。
如果您仍然想要定义您的自定义渠道来以不同方式处理消息,您必须创建一个类似于此 sample 的自定义界面。
在 SCDF 中部署流时,您可以分别通过 spring.cloud.stream.kafka.bindings.<channelName>.producer
和 spring.cloud.stream.kafka.bindings.<channelName>.consumer
绑定属性覆盖生产者和消费者之间的目的地。
编辑:
虽然有上述方法,但我从 Spring Cloud Stream 负责人 (@marius-bogoevici) 那里了解到了一个更简单的解决方案。
已经有一个可供使用的默认错误通道,并且 Spring 集成支持它。
有了这个,在您的应用程序中,您可以通过以下方式将自定义消息发送到默认错误通道:@Autowire @Qualifier("errorChannel")
。事实上,此支持也适用于所有 OOTB 应用程序。
然后您可以通过以下方式覆盖此错误通道的目标:spring.cloud.stream.bindings.error.destination=errorchannel-test
。在 SCDF 中,您将在流部署时通过以下方式传递此信息:--properties
。
例如:
stream create foo --definition "mysource | log"
stream deploy foo --properties "app.mysource.spring.cloud.stream.bindings.error.destination=errorchannel-test"
我有一个源通过默认 output
通道将消息发送到流中的处理器。现在我想也通过不同的渠道发送失败消息。
我想我应该创建一个可绑定的接口,它从 Source
扩展并使用 @Output
添加额外的通道。我如何确保 SCDF 实际上为这个频道创建了一个 Kafka 主题? IOW,流定义是什么样的?
例如类似
的东西source | processor | sink
source > error-sink
source | processor
使用常规 output
channel/Kafka 主题,source > error-sink
使用不同的 channel/topic。
如果需要跟踪下游处理的错误消息,您可以使用与 Spring Cloud Stream 关联的 OOTB DLQ 机制。 Rabbit and Kafka. You could enable DLQ in Spring Cloud Data Flow (SCDF) as a global setting 或每个流都支持它。
如果您仍然想要定义您的自定义渠道来以不同方式处理消息,您必须创建一个类似于此 sample 的自定义界面。
在 SCDF 中部署流时,您可以分别通过 spring.cloud.stream.kafka.bindings.<channelName>.producer
和 spring.cloud.stream.kafka.bindings.<channelName>.consumer
绑定属性覆盖生产者和消费者之间的目的地。
编辑:
虽然有上述方法,但我从 Spring Cloud Stream 负责人 (@marius-bogoevici) 那里了解到了一个更简单的解决方案。
已经有一个可供使用的默认错误通道,并且 Spring 集成支持它。
有了这个,在您的应用程序中,您可以通过以下方式将自定义消息发送到默认错误通道:@Autowire @Qualifier("errorChannel")
。事实上,此支持也适用于所有 OOTB 应用程序。
然后您可以通过以下方式覆盖此错误通道的目标:spring.cloud.stream.bindings.error.destination=errorchannel-test
。在 SCDF 中,您将在流部署时通过以下方式传递此信息:--properties
。
例如:
stream create foo --definition "mysource | log"
stream deploy foo --properties "app.mysource.spring.cloud.stream.bindings.error.destination=errorchannel-test"