Rx (RxKotlin) - 使用 groupJoin 的 rightGroupJoin - 合并/合并两个不同类型的可观察对象

Rx (RxKotlin) - rightGroupJoin using groupJoin - merge / combine two observables of different types

折腾了几天,看似简单的任务,来找大家了:)

想法很简单。我有两个 streams/observables、'left' 和 'right'。 我想要 'right' 到 buffer/collect/aggregate 到 'current' 中的项目 'left'.
因此,'left' 中的每个项目定义一个新的 'window',而所有 'right' 项目将绑定到那个 window,直到发出新的 'left' 项目。因此,可视化:

任务:
'left' : |- A - - - - - B - - C - - - -|
'right' : |- 1 - 2 - 3 -4 - 5 - 6 - - -|
'result':|- - - - - - - -x - - -y - - - -z| (Pair<Left, List<Right>>)
其中:A,1B,4 (所以x); C (so y) 同时发出
所以: x = Pair(A, [1,2,3]), y = Pair(B, [4, 5])
并且: 'right' & 'result' complete/terminate 当 'left' 做
因此: z = Pair(C, [6]) - 作为 'left' 完成

的结果发出

----
编辑 2 - 最终解决方案!
为了将 'right' 项目与下一个 'left' 而不是前一个聚合,我将代码更改为这么多 shorter/simpler 一个:

fun <L, R> Observable<L>.rightGroupJoin(right: Observable<R>): Observable<Pair<L, List<R>>> {
    return this.share().run {
        zipWith(right.buffer(this), BiFunction { left, rightList ->
            Pair(left, rightList)
        })
    }
}  

编辑 1 - 初始解决方案!
摘自下面@Mark 的(接受的)答案,这是我想出的。
它被分成更小的方法,因为我也做 multiRightGroupJoin() 加入我想要的尽可能多的(正确的)流。

fun <T, R> Observable<T>.rightGroupJoin(right: Observable<R>): Observable<Pair<T, List<R>>> {
    return this.share().let { thisObservable ->    //use 'share' to avoid multi-subscription complications, e.g. multi calls to **preceding** doOnComplete
        thisObservable.flatMapSingle { t ->        //treat each 'left' as a Single
            bufferRightOnSingleLeft(thisObservable, t, right)
        }
    }
}

其中:

private fun <T, R> bufferRightOnSingleLeft(left: Observable<*>, leftSingleItem: T, right: Observable<R>)
    : Single<Pair<T, MutableList<R>>> {

    return right.buffer(left)                              //buffer 'right' until 'left' onNext() (for each 'left' Single) 
        .map { Pair(leftSingleItem, it) }
        .first(Pair(leftSingleItem, emptyList()))   //should be only 1 (list). THINK firstOrError
}  

----

到目前为止我得到了什么
在大量阅读和理解不知何故没有开箱即用的实现之后,我决定使用 groupJoin,主要使用 this link,像这样:(许多问题和这里需要改进的地方,不要使用此代码)

private fun <T, R> Observable<T>.rightGroupJoin(right: Observable<R>): Observable<Pair<T, List<R>>> {

var thisCompleted = false //THINK is it possible to make the groupJoin complete on the left(this)'s onComplete automatically?
val thisObservable = this.doOnComplete { thisCompleted = true }
        .share() //avoid weird side-effects of multiple onSubscribe calls

//join/attach 'right/other' stream to windows (buffers), starting and ending on each 'this/left' onNext
return thisObservable.groupJoin(

    //bind 'right/other' stream to 'this/left'
    right.takeUntil { thisCompleted }//have an onComplete rule THINK add share() at the end?

    //define when windows start/end ('this/left' onNext opens new window and closes prev)
    , Function<T, ObservableSource<T>> { thisObservable }

    //define 'right/other' stream to have no windows/intervals/aggregations by itself
    // -> immediately bind each emitted item to a 'current' window(T) above
    , Function<R, ObservableSource<R>> { Observable.empty() }

    //collect the whole 'right' stream in 'current' ('left') window
    , BiFunction<T, Observable<R>, Single<Pair<T, List<R>>>> { t, rObs ->
        rObs.collect({ mutableListOf<R>() }) { acc, value ->
            acc.add(value)
        }.map { Pair(t, it.toList()) }

    }).mergeAllSingles()
}  

我也使用类似的用法来创建一个 timedBuffer() - 与 buffer(timeout) 相同,但每个缓冲区都有一个时间戳 (List) 以了解它何时开始。基本上通过 运行 在 Observable.interval(timeout) 上使用相同的代码(作为 'left')

问题/问题(从最简单到最难)

  1. 这是做类似事情的最佳方式吗?是不是有点矫枉过正了?
  2. 当 'left' 完成时,是否有更好的方法(必须)来完成 'result'(和 'right')?没有这个丑陋的布尔逻辑?
  3. 这种用法似乎打乱了 rx 的顺序。请参阅下面的代码并打印:

    leftObservable
    .doOnComplete {
        log("doOnComplete - before join")
     }
    .doOnComplete {
        log("doOnComplete 2 - before join")
     }
    .rightGroupJoin(rightObservable)
    .doOnComplete {
        log("doOnComplete - after join")
     }
    

打印(有时!看起来像竞争条件)以下内容:
doOnComplete - before join
doOnComplete - after join
doOnComplete 2 - before join

  1. 上面代码第一次运行没有调用doOnComplete - after join,第二次调用了两次。第三次像第一次,第四次像第二次,依此类推...
    3,4 都是 运行 使用此代码。可能与 subscribe {} 用法有关?请注意,我不拿着一次性用品。 此流结束是因为我对 'left' observable

    进行了 GC
    leftObservable.subscribeOn().observeOn()
    .doOnComplete{log...}
    .rightGroupJoin()
    .doOnComplete{log...}
    .subscribe {}  
    

注意 1:在 mergeAllSingles() 之后添加 .takeUntil { thisCompleted } 似乎修复了 #4。

注意 2:在使用此方法加入多个流并应用 'Note1' 后,很明显 onComplete(在 groupJoin() 调用之前!!!)将被调用多次 'right' Observables,可能意味着原因是 right.takeUntil { thisCompleted },关闭 'right' 流真的很重要吗?

Note3:关于Note1,似乎和takeUntil vs. takeWhile有很大关系。使用 takeWhile 减少了 doOnComplete 调用,这在某种程度上是合乎逻辑的。仍在努力弄清楚。

  1. 你能想到 multiGroupJoin,或者在我们的例子中,multiRightGroupJoin,而不是 运行ning zip on groupJoin * rightObservablesCount?

想问什么就问什么。我知道我对 subscribe/disposable 和手册 onComplete 的使用不是这样的,我只是不确定什么是..

乍一看我会在这里使用 2 scans。示例:

data class Result(val left: Left?, val rightList: List<Right>) {
    companion object {
        val defaultInstance: Result = Result(null, listOf())
    }
}

leftObservable.switchMap { left -> 
    rightObservable.scan(listOf()) {list, newRight -> list.plus(newRight)}
        .map { rightsList -> Result(left, rightList) }
}
.scan(Pair(Result.defaultInstance, Result.defaultInstance)) { oldPair, newResult -> 
    Pair(oldPair.second, newResult)
}
.filter { it.first != it.second }
.map { it.first }

这里唯一的问题是处理onComplete,不知道该怎么做

像这样简单的事情应该可行:

@JvmStatic
fun main(string: Array<String>) {
    val left = PublishSubject.create<String>()
    val right = PublishSubject.create<Int>()

    left.flatMapSingle { s ->  right.buffer(left).map { Pair(s, it) }.firstOrError() }
            .subscribe{ println("Group : Letter : ${it.first}, Elements : ${it.second}") }


    left.onNext("A")
    right.onNext(1)
    right.onNext(2)
    right.onNext(3)
    left.onNext("B")
    right.onNext(4)
    right.onNext(5)
    left.onNext("C")
    right.onNext(6)
    left.onComplete()
}

输出:

Group : Letter : A, Elements : [1, 2, 3]
Group : Letter : B, Elements : [4, 5]
Group : Letter : C, Elements : [6]

Observable感兴趣的是左边,订阅吧。然后只需通过左侧可观察对象的下一次发射或完成来缓冲右侧。您只对每个上游左发射的单个结果感兴趣,因此只需使用 flatMapSingle。我选择了 firstOrError() 但显然可以有一个默认项或其他错误处理甚至 flatMapMaybe 加上 firstElement()

编辑

OP 进行了进一步的问答,发现原来的问题和上面的解决方案是用前一个左发射缓冲右值,直到下一个左发射(如上所述),这不是必需的行为。新要求的行为是将右值缓冲到 NEXT 左发射,如下所示:

@JvmStatic
    fun main(string: Array<String>) {
        val left = PublishSubject.create<String>()
        val right = PublishSubject.create<Int>()


        left.zipWith (right.buffer(left), 
                BiFunction<String, List<Int>, Pair<String, List<Int>>> { t1, t2 -> Pair(t1, t2)
        }).subscribe { println("Group : Letter : ${it.first}, Elements : ${it.second}") }

        left.onNext("A")
        right.onNext(1)
        right.onNext(2)
        right.onNext(3)
        left.onNext("B")
        right.onNext(4)
        right.onNext(5)
        left.onNext("C")
        right.onNext(6)
        left.onComplete()
    }

这会产生不同的最终结果,因为左值与之前的右值压缩,直到下一个左发射(反向)。

输出:

Group : Letter : A, Elements : []
Group : Letter : B, Elements : [1, 2, 3]
Group : Letter : C, Elements : [4, 5]