flatMapFirst:flatMap 仅当前一个结束时才添加新的 observable

flatMapFirst: flatMap which adds new observable only if the previous one ended

如何实现类似于 flatMapflatMapFirst 运算符,但只有在前一个结束时才添加新的 observable?如果之前的还是运行,它就直接忽略了新的observable。如何在 RxJava 2 中实现?

它已经存在于培根中 - flatMapFirst 在开菲尔中 - flatMapFirst.

您不需要新的运算符,而是现有运算符的组合:

source.onBackpressureLatest().flatMap(function, 1)

FlatMap 将立即 运行 1 个内部源,如果没有需求,onBackpressureLatest 将继续丢弃外部源值(最新的除外),而 flatMap 运行 是第一个内部源。

如果您不想继续使用最新的来源,请考虑改用 onBackpressureDrop

的后续行动。如果您正在寻找一种在 RxJava 2 Observable 中使用 flatMapFirst 而不是 Flowable 的方法,这里有一个快速的 Kotlin 实现:

fun <T, R> Observable<T>.flatMapFirst(transform: (T) -> Observable<R>) =
    toFlowable(BackpressureStrategy.DROP)
        .flatMap({ transform(it).toFlowable(BackpressureStrategy.BUFFER) }, 1)
        .toObservable()

UPD. 基于 David Karnok 的建议的替代实现:

fun <T, R> Observable<T>.flatMapFirst(transform: (T) -> Observable<R>) =
    Observable.defer {
        val busy = AtomicBoolean()
        return@defer this
                .filter { busy.compareAndSet(false, true) }
                .flatMap {
                    transform(it).doAfterTerminate { busy.set(false) }
                }
    }

我设法解决了这个问题:

/**
 * Flatmaps upstream items into [source] items.
 * Ignores upstream items if there is any [source] instance currently running.
 *
 * ```
 * upstream ----u-----u---u-------u---------------|-->
 *              ↓                 ↓               ↓
 * source       ---s-------|->    ---s-------|->  ↓
 *                 ↓                 ↓            ↓
 * result   -------s-----------------s------------|-->
 * ```
 */
fun <T, R> Observable<T>.flatMapWithDrop(source: Observable<R>): Observable<R> {
  return this.toFlowable(BackpressureStrategy.DROP)
    .flatMap({ source.toFlowable(BackpressureStrategy.MISSING) }, 1)
    .toObservable()
}