将 Kotlin 协程与 Spring Kafka 监听器一起使用
Using Kotlin Coroutines Together with Spring Kafka Listeners
我正在尝试混合使用 Spring Kafka (2.5.6.RELEASE) 监听器和 Kotlin 协程。详细来说,我有一个 suspend fun
:
suspend fun updatePrice(command: StockPriceUpdateCommand): Boolean
然后,我有一个 Spring Kafka 监听器,它必须在每次从分区读取新消息时调用该函数:
@KafkaListener(
id = "priceListener",
topics = [ "prices" ],
groupId = "prices",
properties = [
"key.deserializer=org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer=org.apache.kafka.common.serialization.DoubleDeserializer"
]
)
fun listenToPrices(
@Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY) stock: String,
@Payload price: Double) {
useCase.updatePrice(StockPriceUpdateUseCase.StockPriceUpdateCommand(stock, price))
}
显然,编译器不允许我调用 updatePrice
,因为错误“暂停函数 'updatePrice' 应该只从协程或另一个暂停函数调用".
在这种情况下哪种方法是正确的?
谢谢。
查看关于 @RabbitListener
的类似问题 here。
It's not clear what you are trying to achieve here.
My understanding is suspend functions can only be called from a coroutine; since @RabbitListener
methods are called by the framework, not user code, we'd have to add a shim between the framework and the listener - but exactly how would that perform any useful function?
我正在尝试混合使用 Spring Kafka (2.5.6.RELEASE) 监听器和 Kotlin 协程。详细来说,我有一个 suspend fun
:
suspend fun updatePrice(command: StockPriceUpdateCommand): Boolean
然后,我有一个 Spring Kafka 监听器,它必须在每次从分区读取新消息时调用该函数:
@KafkaListener(
id = "priceListener",
topics = [ "prices" ],
groupId = "prices",
properties = [
"key.deserializer=org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer=org.apache.kafka.common.serialization.DoubleDeserializer"
]
)
fun listenToPrices(
@Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY) stock: String,
@Payload price: Double) {
useCase.updatePrice(StockPriceUpdateUseCase.StockPriceUpdateCommand(stock, price))
}
显然,编译器不允许我调用 updatePrice
,因为错误“暂停函数 'updatePrice' 应该只从协程或另一个暂停函数调用".
在这种情况下哪种方法是正确的?
谢谢。
查看关于 @RabbitListener
的类似问题 here。
It's not clear what you are trying to achieve here.
My understanding is suspend functions can only be called from a coroutine; since
@RabbitListener
methods are called by the framework, not user code, we'd have to add a shim between the framework and the listener - but exactly how would that perform any useful function?