结合入站通道适配器和流发射器
combining inbound channel adapters and stream emitter
我正在玩 spring 云流反应,我遇到了问题。
考虑以下代码:
@InboundChannelAdapter("list", poller = [(Poller(fixedDelay = "${thetis.listInterval:60000}"))])
fun timerMessageSource(): Flux<Center> = config.centers.toFlux()
我的目标是生成一种应由以下形式消耗的通量:
@StreamListener("list") @Output("download")
fun processList(center: Center): Flux<Product> = ...
但这似乎行不通。通道适配器正确生成通量,但它表示如下:
org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Unrecognized token 'FluxIterable': was expecting ('true', 'false' or 'null')
我想我应该在入站通道适配器上添加一个 StreamEmitter
注释,但这似乎不起作用。
实现这种流程的正确方法是什么?
谢谢你和问候,
费尔南多
例外情况非常明显:您生成一个 Flux
对象作为消息的 payload
发送到 list
通道以发送到消息传递的目标目的地中间件。并且完全正确 Flux
与要序列化的 JSON 不兼容。
另一方面,我不确定什么是 Kotlin 并将您的代码编译为 Java,但我们有 out-of-the-box 类似的东西(对于 Java):
@StreamEmitter
@Output("list")
public Flux<Center> timerMessageSource() {
return config.centers.toFlux();
}
并且 flux 中的每个发出的项目都将被序列化并作为记录或消息发送到 Binder。当然,如果您的 Center
与 JSON 兼容。为此,您需要 spring-cloud-stream-reactive
依赖项。
是的,@InboundChannelAdapter
在这里没有帮助,甚至打扰了。
如果您担心一些周期性发射,是否应该考虑 Project Reactor 中的调度支持。
我正在玩 spring 云流反应,我遇到了问题。
考虑以下代码:
@InboundChannelAdapter("list", poller = [(Poller(fixedDelay = "${thetis.listInterval:60000}"))])
fun timerMessageSource(): Flux<Center> = config.centers.toFlux()
我的目标是生成一种应由以下形式消耗的通量:
@StreamListener("list") @Output("download")
fun processList(center: Center): Flux<Product> = ...
但这似乎行不通。通道适配器正确生成通量,但它表示如下:
org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Unrecognized token 'FluxIterable': was expecting ('true', 'false' or 'null')
我想我应该在入站通道适配器上添加一个 StreamEmitter
注释,但这似乎不起作用。
实现这种流程的正确方法是什么?
谢谢你和问候,
费尔南多
例外情况非常明显:您生成一个 Flux
对象作为消息的 payload
发送到 list
通道以发送到消息传递的目标目的地中间件。并且完全正确 Flux
与要序列化的 JSON 不兼容。
另一方面,我不确定什么是 Kotlin 并将您的代码编译为 Java,但我们有 out-of-the-box 类似的东西(对于 Java):
@StreamEmitter
@Output("list")
public Flux<Center> timerMessageSource() {
return config.centers.toFlux();
}
并且 flux 中的每个发出的项目都将被序列化并作为记录或消息发送到 Binder。当然,如果您的 Center
与 JSON 兼容。为此,您需要 spring-cloud-stream-reactive
依赖项。
是的,@InboundChannelAdapter
在这里没有帮助,甚至打扰了。
如果您担心一些周期性发射,是否应该考虑 Project Reactor 中的调度支持。