RxJava/RxKotlin:如果一个源完成(不是全部),则已经完成的 combineLatest
RxJava/RxKotlin: combineLatest that already completes if one source completes (not all)
基本上,我有两个 Flowables F
和 G
,我想对它们使用 combineLatest
,但我希望组合的 Flowable
已经完成,如果 F
完成(即使 G
仍然是 运行)。
这是我用丑陋的解决方案实现的目标的示例:
fun combineFandGbutTerminateIfFTerminates(F: Flowable<Int>, G: Flowable<Int>) : Flowable<Pair<Int, Int>> {
val _F = F.share()
val _G = G.takeUntil(_F.ignoreElements().toFlowable<Nothing>())
val FandG = Flowables.combineLatest(_F, _G)
return FandG
}
我们可以将其提取到扩展函数中:
fun<T> Flowable<T>.completeWith(other: Flowable<*>) : Flowable<T> {
return takeUntil(other.ignoreElements().toFlowable<Nothing>())
}
有更好的表达方式吗?
我得出以下解决方案。它允许将一个主站与多个从站源结合起来。如果 master 完成,则合并的 Flowable
完成。但是,如果从服务器在主服务器之前完成,则会传播错误 SlaveCompletedPrematurelyError
。
class SlaveCompletedPrematurelyError(message: String) : Throwable(message)
/**
* Combine this Flowable with one slave source.
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
fun <T, T1, R> Flowable<T>.combineLatestSlaves(
slaveSource: Flowable<T1>,
combineFunction: (T, T1) -> R
): Flowable<R> = combineLatestSlaves(Functions.toFunction(combineFunction), slaveSource)
/**
* Combine this Flowable with two slave sources.
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
fun <T, T1, T2, R> Flowable<T>.combineLatestSlaves(
slaveSource1: Flowable<T1>,
slaveSource2: Flowable<T2>,
combineFunction: (T, T1, T2) -> R
) =
combineLatestSlaves(Functions.toFunction(combineFunction), slaveSource1, slaveSource2)
/**
* Combine this Flowable with three slave sources.
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
fun <T, T1, T2, T3, R> Flowable<T>.combineLatestSlaves(
slaveSource1: Flowable<T1>,
slaveSource2: Flowable<T2>,
slaveSource3: Flowable<T3>,
combineFunction: (T, T1, T2, T3) -> R
) =
combineLatestSlaves(Functions.toFunction(combineFunction), slaveSource1, slaveSource2, slaveSource3)
/**
* Combine this Flowable with many slave sources.
*/
@SchedulerSupport(SchedulerSupport.NONE)
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
fun <T : U, U, R> Flowable<T>.combineLatestSlaves(
combiner: Function<in Array<Any>, out R>,
vararg slaveSources: Publisher<out U>
): Flowable<R> =
combineLatestSlaves(slaveSources, combiner, bufferSize())
/**
* Combine this Flowable with many slave sources.
*
* This function is identical of using combineLatest with this and the slave sources except with the following changes:
* - If this Flowable completes, the resulting Flowable completes even if the slave sources are still running.
* - If a slave source completes before this Flowable, a SlaveCompletedPrematurelyError error is triggered.
*/
@SchedulerSupport(SchedulerSupport.NONE)
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
fun <T : U, U, R> Flowable<T>.combineLatestSlaves(
slaveSources: Array<out Publisher<out U>>,
combiner: Function<in Array<Any>, out R>,
bufferSize: Int
): Flowable<R> {
val masterCompleted = Throwable()
val sources = Array<Publisher<out U>>(slaveSources.size + 1) {
when (it) {
0 -> Flowable.error<U>(masterCompleted).startWith(this)
else -> Flowable.error<U> { SlaveCompletedPrematurelyError(slaveSources[it - 1].toString()) }.startWith(
slaveSources[it - 1]
)
}
}
return combineLatest(sources, combiner, bufferSize).onErrorComplete { it == masterCompleted }
}
/**
* Errors encountered in the stream for which the provided `predicate` returns true will be silently turned into graceful completion.
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
inline fun <T> Flowable<T>.onErrorComplete(crossinline predicate: (Throwable) -> Boolean): Flowable<T> =
onErrorResumeNext { error: Throwable ->
if (predicate(error)) Flowable.empty<T>() else Flowable.error<T>(
error
)
}
基本上,我有两个 Flowables F
和 G
,我想对它们使用 combineLatest
,但我希望组合的 Flowable
已经完成,如果 F
完成(即使 G
仍然是 运行)。
这是我用丑陋的解决方案实现的目标的示例:
fun combineFandGbutTerminateIfFTerminates(F: Flowable<Int>, G: Flowable<Int>) : Flowable<Pair<Int, Int>> {
val _F = F.share()
val _G = G.takeUntil(_F.ignoreElements().toFlowable<Nothing>())
val FandG = Flowables.combineLatest(_F, _G)
return FandG
}
我们可以将其提取到扩展函数中:
fun<T> Flowable<T>.completeWith(other: Flowable<*>) : Flowable<T> {
return takeUntil(other.ignoreElements().toFlowable<Nothing>())
}
有更好的表达方式吗?
我得出以下解决方案。它允许将一个主站与多个从站源结合起来。如果 master 完成,则合并的 Flowable
完成。但是,如果从服务器在主服务器之前完成,则会传播错误 SlaveCompletedPrematurelyError
。
class SlaveCompletedPrematurelyError(message: String) : Throwable(message)
/**
* Combine this Flowable with one slave source.
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
fun <T, T1, R> Flowable<T>.combineLatestSlaves(
slaveSource: Flowable<T1>,
combineFunction: (T, T1) -> R
): Flowable<R> = combineLatestSlaves(Functions.toFunction(combineFunction), slaveSource)
/**
* Combine this Flowable with two slave sources.
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
fun <T, T1, T2, R> Flowable<T>.combineLatestSlaves(
slaveSource1: Flowable<T1>,
slaveSource2: Flowable<T2>,
combineFunction: (T, T1, T2) -> R
) =
combineLatestSlaves(Functions.toFunction(combineFunction), slaveSource1, slaveSource2)
/**
* Combine this Flowable with three slave sources.
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
fun <T, T1, T2, T3, R> Flowable<T>.combineLatestSlaves(
slaveSource1: Flowable<T1>,
slaveSource2: Flowable<T2>,
slaveSource3: Flowable<T3>,
combineFunction: (T, T1, T2, T3) -> R
) =
combineLatestSlaves(Functions.toFunction(combineFunction), slaveSource1, slaveSource2, slaveSource3)
/**
* Combine this Flowable with many slave sources.
*/
@SchedulerSupport(SchedulerSupport.NONE)
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
fun <T : U, U, R> Flowable<T>.combineLatestSlaves(
combiner: Function<in Array<Any>, out R>,
vararg slaveSources: Publisher<out U>
): Flowable<R> =
combineLatestSlaves(slaveSources, combiner, bufferSize())
/**
* Combine this Flowable with many slave sources.
*
* This function is identical of using combineLatest with this and the slave sources except with the following changes:
* - If this Flowable completes, the resulting Flowable completes even if the slave sources are still running.
* - If a slave source completes before this Flowable, a SlaveCompletedPrematurelyError error is triggered.
*/
@SchedulerSupport(SchedulerSupport.NONE)
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
fun <T : U, U, R> Flowable<T>.combineLatestSlaves(
slaveSources: Array<out Publisher<out U>>,
combiner: Function<in Array<Any>, out R>,
bufferSize: Int
): Flowable<R> {
val masterCompleted = Throwable()
val sources = Array<Publisher<out U>>(slaveSources.size + 1) {
when (it) {
0 -> Flowable.error<U>(masterCompleted).startWith(this)
else -> Flowable.error<U> { SlaveCompletedPrematurelyError(slaveSources[it - 1].toString()) }.startWith(
slaveSources[it - 1]
)
}
}
return combineLatest(sources, combiner, bufferSize).onErrorComplete { it == masterCompleted }
}
/**
* Errors encountered in the stream for which the provided `predicate` returns true will be silently turned into graceful completion.
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
inline fun <T> Flowable<T>.onErrorComplete(crossinline predicate: (Throwable) -> Boolean): Flowable<T> =
onErrorResumeNext { error: Throwable ->
if (predicate(error)) Flowable.empty<T>() else Flowable.error<T>(
error
)
}