有没有办法在 Spring Cloud Stream Reactive 方法中动态更改目的地?

Is there a way to dynamically change destination in Spring Cloud Stream Reactive method?

我知道如果我以 Spring 反应方式实现 SC Stream Function,我不能像传统命令方式那样发送 DLQ。

为此,我尝试手动更改 MessageHeader 中的 Destination,以便消息转到 DLQ 主题。

但是,如果发生错误,我想使用onErrorContinue方法向僵尸Topic发送消息,但是这不起作用。

我不应该在 onErrorContinue 中生成它吗?

input.flatMap { sellerDto ->
            // do something..
        }.onErrorContinue { throwable, source ->
            log.error("error occured!! ${throwable.message}")
            source as Message<*>
            Flux.just(MessageBuilder.withPayload(String(source.payload as ByteArray)).setHeader("spring.cloud.stream.sendto.destination","zombie").build())
        }

不,因为反应函数的工作单元是整个流(而不是单个项目)。我提供更多细节here

也就是说,您可以注入 StreamBridge,它始终作为 bean 可用,并在 filter 或任何其他适用的响应式运算符中使用它。基本上 streamBridge.send("destination", Message)