将 Kotlin 协程与 Spring Kafka 监听器一起使用

Using Kotlin Coroutines Together with Spring Kafka Listeners

我正在尝试混合使用 Spring Kafka (2.5.6.RELEASE) 监听器和 Kotlin 协程。详细来说,我有一个 suspend fun:

suspend fun updatePrice(command: StockPriceUpdateCommand): Boolean

然后,我有一个 Spring Kafka 监听器,它必须在每次从分区读取新消息时调用该函数:

@KafkaListener(
    id = "priceListener",
    topics = [ "prices" ],
    groupId = "prices",
    properties = [
        "key.deserializer=org.apache.kafka.common.serialization.StringDeserializer",
        "value.deserializer=org.apache.kafka.common.serialization.DoubleDeserializer"
    ]
 ) 
 fun listenToPrices(
    @Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY) stock: String,
    @Payload price: Double) {

    useCase.updatePrice(StockPriceUpdateUseCase.StockPriceUpdateCommand(stock, price))
 }

显然,编译器不允许我调用 updatePrice,因为错误“暂停函数 'updatePrice' 应该只从协程或另一个暂停函数调用".

在这种情况下哪种方法是正确的?

谢谢。

查看关于 @RabbitListener 的类似问题 here

It's not clear what you are trying to achieve here.

My understanding is suspend functions can only be called from a coroutine; since @RabbitListener methods are called by the framework, not user code, we'd have to add a shim between the framework and the listener - but exactly how would that perform any useful function?