在 Kotlin 中,如何将 "CompletableFuture<Optional<T>>" 转换为 "Flow<T?>"?

In Kotlin, how do I convert "CompletableFuture<Optional<T>>" to "Flow<T?>"?

我正在尝试将 CompletableFuture<Optional<T>> 转换为 Flow<T?>。我要写的扩展函数是

fun <T> CompletableFuture<Optional<T>>.asFlowOfNullable(): Flow<T?> =
    this.toMono().map { (if (it.isPresent) it.get() else null) }.asFlow()

但它失败了,因为 asFlow() 不存在可空类型,AFAICT 基于它的定义。

那么,如何将 CompletableFuture<Optional<T>> 转换为 Flow<T?>

编辑 1:

这是我到目前为止的想法。感谢反馈。

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOf
import java.util.Optional
import java.util.concurrent.CompletableFuture

fun <T> Optional<T>.orNull(): T? = orElse(null)

fun <T> CompletableFuture<Optional<T>>.asFlowOfNullable(): Flow<T?> = flowOf(this.join().orNull())

仅供参考,在我的例子中,它使用的是 Axon 的 Kotlin 扩展 queryOptional,我现在可以这样写:

inline fun <reified R, reified Q> findById(q: Q, qgw: QueryGateway): Flow<R?> {
    return qgw.queryOptional<R, Q>(q).asFlowOfNullable()
}

我将暂时推迟使用上述模式创建评论作为允许反馈的答案。

编辑 2: 因为下面指出编辑 1 中的 asFlowOfNullable 会阻塞线程,所以我暂时从@Joffrey 那里得到:

fun <T> Optional<T>.orNull(): T? = orElse(null)

fun <T> CompletableFuture<Optional<T>>.asDeferredOfNullable(): Deferred<T?> = thenApply { it.orNull() }.asDeferred()

编辑 3:感谢@Tenfour04 和@Joffrey 提供的有用意见。 :)

要使用以下扩展,您需要 jdk8 协程库:

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:.5.0"

我不确定您正在使用的 asFlow() 函数来自何处,但我认为没有它也可以使用以下方法。拥有单个项目的 Flow 对我来说似乎有点奇怪,因为它可能只是一个 suspend 函数,或者如果你需要它作为一个对象来传递,一个 Deferred,它用于返回一个结果因此更类似于 Future 而不是 Flow。

fun <T> CompletableFuture<Optional<T>>.asFlowOfNullable(): Flow<T?> =
    flow { emit(await().orElse(null)) }

作为挂起函数:

suspend fun <T> CompletableFuture<Optional<T>>.awaitNullable(): T? = 
    await().orElse(null))

作为延期:

fun <T> CompletableFuture<Optional<T>>.asDeferredNullable(): Deferred<T?> =
    thenApply { it.orElse(null) }.asDeferred()