有没有办法在 Spring Cloud Stream Reactive 方法中动态更改目的地?
Is there a way to dynamically change destination in Spring Cloud Stream Reactive method?
我知道如果我以 Spring 反应方式实现 SC Stream Function,我不能像传统命令方式那样发送 DLQ。
为此,我尝试手动更改 MessageHeader 中的 Destination,以便消息转到 DLQ 主题。
但是,如果发生错误,我想使用onErrorContinue方法向僵尸Topic发送消息,但是这不起作用。
我不应该在 onErrorContinue 中生成它吗?
input.flatMap { sellerDto ->
// do something..
}.onErrorContinue { throwable, source ->
log.error("error occured!! ${throwable.message}")
source as Message<*>
Flux.just(MessageBuilder.withPayload(String(source.payload as ByteArray)).setHeader("spring.cloud.stream.sendto.destination","zombie").build())
}
不,因为反应函数的工作单元是整个流(而不是单个项目)。我提供更多细节here。
也就是说,您可以注入 StreamBridge
,它始终作为 bean 可用,并在 filter
或任何其他适用的响应式运算符中使用它。基本上 streamBridge.send("destination", Message)
我知道如果我以 Spring 反应方式实现 SC Stream Function,我不能像传统命令方式那样发送 DLQ。
为此,我尝试手动更改 MessageHeader 中的 Destination,以便消息转到 DLQ 主题。
但是,如果发生错误,我想使用onErrorContinue方法向僵尸Topic发送消息,但是这不起作用。
我不应该在 onErrorContinue 中生成它吗?
input.flatMap { sellerDto ->
// do something..
}.onErrorContinue { throwable, source ->
log.error("error occured!! ${throwable.message}")
source as Message<*>
Flux.just(MessageBuilder.withPayload(String(source.payload as ByteArray)).setHeader("spring.cloud.stream.sendto.destination","zombie").build())
}
不,因为反应函数的工作单元是整个流(而不是单个项目)。我提供更多细节here。
也就是说,您可以注入 StreamBridge
,它始终作为 bean 可用,并在 filter
或任何其他适用的响应式运算符中使用它。基本上 streamBridge.send("destination", Message)