使用 emit 构建 Kotlin 流程无限期运行且不会完成
Using emit to build a Kotlin flow runs indefinitely and doesnt complete
我使用了一个 java 库,我可以使用它从我的 eventstore 数据库订阅事件。
我可以根据以下创建订阅SubscirptionListener
public abstract class SubscriptionListener {
public void onEvent(Subscription subscription, ResolvedEvent event) {
}
public void onError(Subscription subscription, Throwable throwable) {
}
public void onCancelled(Subscription subscription) {
}
}
我想在每次触发订阅时发出 ResolvedEvent
s 作为流的一部分。但是,对 emit
的调用并未完成。
fun flowSubscriptionListener(
streamName: String,
options: SubscribeToStreamOptions = SubscribeToStreamOptions.get(),
onError: (subscription: Subscription?, throwable: Throwable) -> Unit = { _, _ -> },
onCancelled: (subscription: Subscription) -> Unit = { _ -> }
): Flow<ResolvedEvent> {
return flow {
val listener = object : SubscriptionListener() {
override fun onEvent(subscription: Subscription, event: ResolvedEvent) {
logger.info {
"Received event ${event.originalEvent.streamRevision}@${event.originalEvent.streamId}"
}
runBlocking {
logger.info { "emitting event" }
this@flow.emit(event)
logger.info { "Event emitted" }
}
}
override fun onError(subscription: Subscription?, throwable: Throwable) {
logger.error {
"Received error with message: ${throwable.message ?: "No message"} on subscription ${subscription?.subscriptionId}"
}
onError(subscription, throwable)
}
override fun onCancelled(subscription: Subscription) {
logger.debug { "Subscription ${subscription.subscriptionId} cancelled" }
onCancelled(subscription)
}
}
client.subscribeToStream(streamName, listener).await()
}.buffer(10)
}
我有一个示例设置,我在其中等待包含三个事件的流程
flowSubscriptionListener(
streamName = "SampleTournament-adb517b8-62e9-4305-b3b6-c1e7193a6d19",
).map {
it.event.eventType
}.collect {
println(it)
}
但是,我根本没有收到任何事件。控制台输出显示 emit
的调用永远不会终止。
[grpc-default-executor-1] INFO lib.eventstoredb.wrapper.EskWrapperEsdb - Received event 0@SampleTournament-adb517b8-62e9-4305-b3b6-c1e7193a6d19
[grpc-default-executor-1] INFO lib.eventstoredb.wrapper.EskWrapperEsdb - emitting event
我期待 "Event emitted"
的日志记录
为了换行 callback-based API,您应该改用 callbackFlow。它支持并发排放,我认为这可能是您的问题。
此外,当流本身被取消时(通过 awaitClose()
),它将正确处理订阅的取消。
这是一种方法:
fun EventStoreDBClient.flowSubscription(
streamName: String,
options: SubscribeToStreamOptions = SubscribeToStreamOptions.get(),
): Flow<ResolvedEvent> = callbackFlow {
val listener = object : SubscriptionListener() {
override fun onEvent(subscription: Subscription, event: ResolvedEvent) {
logger.info { "Received event ${event.originalEvent.streamRevision}@${event.originalEvent.streamId}" }
logger.info { "Emitting event" }
trySendBlocking(event)
logger.info { "Event emitted" }
}
override fun onError(subscription: Subscription?, throwable: Throwable) {
logger.error {
"Received error with message: ${throwable.message ?: "No message"} on subscription ${subscription?.subscriptionId}"
}
close(throwable)
}
override fun onCancelled(subscription: Subscription) {
logger.debug { "Subscription ${subscription.subscriptionId} cancelled" }
close()
}
}
val subscription = subscribeToStream(streamName, listener, options).await()
awaitClose {
subscription.stop()
}
}.buffer(10)
请注意,我还在EventStoreDBClient
上将其转换为扩展函数,这在这里似乎很合适。我删除了 error/cancellation 回调,因为 Flow
已经处理了这些回调(如果需要,可以将它们放回去)
我使用了一个 java 库,我可以使用它从我的 eventstore 数据库订阅事件。
我可以根据以下创建订阅SubscirptionListener
public abstract class SubscriptionListener {
public void onEvent(Subscription subscription, ResolvedEvent event) {
}
public void onError(Subscription subscription, Throwable throwable) {
}
public void onCancelled(Subscription subscription) {
}
}
我想在每次触发订阅时发出 ResolvedEvent
s 作为流的一部分。但是,对 emit
的调用并未完成。
fun flowSubscriptionListener(
streamName: String,
options: SubscribeToStreamOptions = SubscribeToStreamOptions.get(),
onError: (subscription: Subscription?, throwable: Throwable) -> Unit = { _, _ -> },
onCancelled: (subscription: Subscription) -> Unit = { _ -> }
): Flow<ResolvedEvent> {
return flow {
val listener = object : SubscriptionListener() {
override fun onEvent(subscription: Subscription, event: ResolvedEvent) {
logger.info {
"Received event ${event.originalEvent.streamRevision}@${event.originalEvent.streamId}"
}
runBlocking {
logger.info { "emitting event" }
this@flow.emit(event)
logger.info { "Event emitted" }
}
}
override fun onError(subscription: Subscription?, throwable: Throwable) {
logger.error {
"Received error with message: ${throwable.message ?: "No message"} on subscription ${subscription?.subscriptionId}"
}
onError(subscription, throwable)
}
override fun onCancelled(subscription: Subscription) {
logger.debug { "Subscription ${subscription.subscriptionId} cancelled" }
onCancelled(subscription)
}
}
client.subscribeToStream(streamName, listener).await()
}.buffer(10)
}
我有一个示例设置,我在其中等待包含三个事件的流程
flowSubscriptionListener(
streamName = "SampleTournament-adb517b8-62e9-4305-b3b6-c1e7193a6d19",
).map {
it.event.eventType
}.collect {
println(it)
}
但是,我根本没有收到任何事件。控制台输出显示 emit
的调用永远不会终止。
[grpc-default-executor-1] INFO lib.eventstoredb.wrapper.EskWrapperEsdb - Received event 0@SampleTournament-adb517b8-62e9-4305-b3b6-c1e7193a6d19
[grpc-default-executor-1] INFO lib.eventstoredb.wrapper.EskWrapperEsdb - emitting event
我期待 "Event emitted"
为了换行 callback-based API,您应该改用 callbackFlow。它支持并发排放,我认为这可能是您的问题。
此外,当流本身被取消时(通过 awaitClose()
),它将正确处理订阅的取消。
这是一种方法:
fun EventStoreDBClient.flowSubscription(
streamName: String,
options: SubscribeToStreamOptions = SubscribeToStreamOptions.get(),
): Flow<ResolvedEvent> = callbackFlow {
val listener = object : SubscriptionListener() {
override fun onEvent(subscription: Subscription, event: ResolvedEvent) {
logger.info { "Received event ${event.originalEvent.streamRevision}@${event.originalEvent.streamId}" }
logger.info { "Emitting event" }
trySendBlocking(event)
logger.info { "Event emitted" }
}
override fun onError(subscription: Subscription?, throwable: Throwable) {
logger.error {
"Received error with message: ${throwable.message ?: "No message"} on subscription ${subscription?.subscriptionId}"
}
close(throwable)
}
override fun onCancelled(subscription: Subscription) {
logger.debug { "Subscription ${subscription.subscriptionId} cancelled" }
close()
}
}
val subscription = subscribeToStream(streamName, listener, options).await()
awaitClose {
subscription.stop()
}
}.buffer(10)
请注意,我还在EventStoreDBClient
上将其转换为扩展函数,这在这里似乎很合适。我删除了 error/cancellation 回调,因为 Flow
已经处理了这些回调(如果需要,可以将它们放回去)