使用 emit 构建 Kotlin 流程无限期运行且不会完成

Using emit to build a Kotlin flow runs indefinitely and doesnt complete

我使用了一个 java 库,我可以使用它从我的 eventstore 数据库订阅事件。

我可以根据以下创建订阅SubscirptionListener

public abstract class SubscriptionListener {
    public void onEvent(Subscription subscription, ResolvedEvent event) {
    }

    public void onError(Subscription subscription, Throwable throwable) {
    }

    public void onCancelled(Subscription subscription) {
    }
}

我想在每次触发订阅时发出 ResolvedEvents 作为流的一部分。但是,对 emit 的调用并未完成。

    fun flowSubscriptionListener(
        streamName: String,
        options: SubscribeToStreamOptions = SubscribeToStreamOptions.get(),
        onError: (subscription: Subscription?, throwable: Throwable) -> Unit = { _, _ -> },
        onCancelled: (subscription: Subscription) -> Unit = { _ -> }
    ): Flow<ResolvedEvent> {
        return flow {
            val listener = object : SubscriptionListener() {
                override fun onEvent(subscription: Subscription, event: ResolvedEvent) {
                    logger.info {
                        "Received event ${event.originalEvent.streamRevision}@${event.originalEvent.streamId}"
                    }

                    runBlocking {
                        logger.info { "emitting event" }
                        this@flow.emit(event)
                        logger.info { "Event emitted" }
                    }
                }

                override fun onError(subscription: Subscription?, throwable: Throwable) {
                    logger.error {
                        "Received error with message: ${throwable.message ?: "No message"} on subscription ${subscription?.subscriptionId}"
                    }
                    onError(subscription, throwable)
                }

                override fun onCancelled(subscription: Subscription) {
                    logger.debug { "Subscription ${subscription.subscriptionId} cancelled" }
                    onCancelled(subscription)
                }
            }
            client.subscribeToStream(streamName, listener).await()
        }.buffer(10)
    }

我有一个示例设置,我在其中等待包含三个事件的流程

flowSubscriptionListener(
            streamName = "SampleTournament-adb517b8-62e9-4305-b3b6-c1e7193a6d19",
        ).map {
            it.event.eventType
        }.collect {
            println(it)
        }

但是,我根本没有收到任何事件。控制台输出显示 emit 的调用永远不会终止。

[grpc-default-executor-1] INFO lib.eventstoredb.wrapper.EskWrapperEsdb - Received event 0@SampleTournament-adb517b8-62e9-4305-b3b6-c1e7193a6d19
[grpc-default-executor-1] INFO lib.eventstoredb.wrapper.EskWrapperEsdb - emitting event

我期待 "Event emitted"

的日志记录

为了换行 callback-based API,您应该改用 callbackFlow。它支持并发排放,我认为这可能是您的问题。

此外,当流本身被取消时(通过 awaitClose()),它将正确处理订阅的取消。

这是一种方法:

fun EventStoreDBClient.flowSubscription(
    streamName: String,
    options: SubscribeToStreamOptions = SubscribeToStreamOptions.get(),
): Flow<ResolvedEvent> = callbackFlow {
    val listener = object : SubscriptionListener() {
        override fun onEvent(subscription: Subscription, event: ResolvedEvent) {
            logger.info { "Received event ${event.originalEvent.streamRevision}@${event.originalEvent.streamId}" }
            logger.info { "Emitting event" }
            trySendBlocking(event)
            logger.info { "Event emitted" }
        }

        override fun onError(subscription: Subscription?, throwable: Throwable) {
            logger.error {
                "Received error with message: ${throwable.message ?: "No message"} on subscription ${subscription?.subscriptionId}"
            }
            close(throwable)
        }

        override fun onCancelled(subscription: Subscription) {
            logger.debug { "Subscription ${subscription.subscriptionId} cancelled" }
            close()
        }
    }
    val subscription = subscribeToStream(streamName, listener, options).await()
    awaitClose {
        subscription.stop()
    }
}.buffer(10)

请注意,我还在EventStoreDBClient上将其转换为扩展函数,这在这里似乎很合适。我删除了 error/cancellation 回调,因为 Flow 已经处理了这些回调(如果需要,可以将它们放回去)