Rx (RxKotlin) - 使用 groupJoin 的 rightGroupJoin - 合并/合并两个不同类型的可观察对象
Rx (RxKotlin) - rightGroupJoin using groupJoin - merge / combine two observables of different types
折腾了几天,看似简单的任务,来找大家了:)
想法很简单。我有两个 streams/observables、'left' 和 'right'。
我想要 'right' 到 buffer/collect/aggregate 到 'current' 中的项目 'left'.
因此,'left' 中的每个项目定义一个新的 'window',而所有 'right' 项目将绑定到那个 window,直到发出新的 'left' 项目。因此,可视化:
任务:
'left' : |- A - - - - - B - - C - - - -|
'right' : |- 1 - 2 - 3 -4 - 5 - 6 - - -|
'result':|- - - - - - - -x - - -y - - - -z| (Pair<Left, List<Right>>
)
其中:A,1; B,4 (所以x); C (so y) 同时发出
所以: x = Pair(A, [1,2,3]), y = Pair(B, [4, 5])
并且: 'right' & 'result' complete/terminate 当 'left' 做
因此: z = Pair(C, [6]) - 作为 'left' 完成
的结果发出
----
编辑 2 - 最终解决方案!
为了将 'right' 项目与下一个 'left' 而不是前一个聚合,我将代码更改为这么多 shorter/simpler 一个:
fun <L, R> Observable<L>.rightGroupJoin(right: Observable<R>): Observable<Pair<L, List<R>>> {
return this.share().run {
zipWith(right.buffer(this), BiFunction { left, rightList ->
Pair(left, rightList)
})
}
}
编辑 1 - 初始解决方案!
摘自下面@Mark 的(接受的)答案,这是我想出的。
它被分成更小的方法,因为我也做 multiRightGroupJoin()
加入我想要的尽可能多的(正确的)流。
fun <T, R> Observable<T>.rightGroupJoin(right: Observable<R>): Observable<Pair<T, List<R>>> {
return this.share().let { thisObservable -> //use 'share' to avoid multi-subscription complications, e.g. multi calls to **preceding** doOnComplete
thisObservable.flatMapSingle { t -> //treat each 'left' as a Single
bufferRightOnSingleLeft(thisObservable, t, right)
}
}
}
其中:
private fun <T, R> bufferRightOnSingleLeft(left: Observable<*>, leftSingleItem: T, right: Observable<R>)
: Single<Pair<T, MutableList<R>>> {
return right.buffer(left) //buffer 'right' until 'left' onNext() (for each 'left' Single)
.map { Pair(leftSingleItem, it) }
.first(Pair(leftSingleItem, emptyList())) //should be only 1 (list). THINK firstOrError
}
----
到目前为止我得到了什么
在大量阅读和理解不知何故没有开箱即用的实现之后,我决定使用 groupJoin
,主要使用 this link,像这样:(许多问题和这里需要改进的地方,不要使用此代码)
private fun <T, R> Observable<T>.rightGroupJoin(right: Observable<R>): Observable<Pair<T, List<R>>> {
var thisCompleted = false //THINK is it possible to make the groupJoin complete on the left(this)'s onComplete automatically?
val thisObservable = this.doOnComplete { thisCompleted = true }
.share() //avoid weird side-effects of multiple onSubscribe calls
//join/attach 'right/other' stream to windows (buffers), starting and ending on each 'this/left' onNext
return thisObservable.groupJoin(
//bind 'right/other' stream to 'this/left'
right.takeUntil { thisCompleted }//have an onComplete rule THINK add share() at the end?
//define when windows start/end ('this/left' onNext opens new window and closes prev)
, Function<T, ObservableSource<T>> { thisObservable }
//define 'right/other' stream to have no windows/intervals/aggregations by itself
// -> immediately bind each emitted item to a 'current' window(T) above
, Function<R, ObservableSource<R>> { Observable.empty() }
//collect the whole 'right' stream in 'current' ('left') window
, BiFunction<T, Observable<R>, Single<Pair<T, List<R>>>> { t, rObs ->
rObs.collect({ mutableListOf<R>() }) { acc, value ->
acc.add(value)
}.map { Pair(t, it.toList()) }
}).mergeAllSingles()
}
我也使用类似的用法来创建一个 timedBuffer()
- 与 buffer(timeout)
相同,但每个缓冲区都有一个时间戳 (List
) 以了解它何时开始。基本上通过 运行 在 Observable.interval(timeout)
上使用相同的代码(作为 'left')
问题/问题(从最简单到最难)
- 这是做类似事情的最佳方式吗?是不是有点矫枉过正了?
- 当 'left' 完成时,是否有更好的方法(必须)来完成 'result'(和 'right')?没有这个丑陋的布尔逻辑?
这种用法似乎打乱了 rx 的顺序。请参阅下面的代码并打印:
leftObservable
.doOnComplete {
log("doOnComplete - before join")
}
.doOnComplete {
log("doOnComplete 2 - before join")
}
.rightGroupJoin(rightObservable)
.doOnComplete {
log("doOnComplete - after join")
}
打印(有时!看起来像竞争条件)以下内容:
doOnComplete - before join
doOnComplete - after join
doOnComplete 2 - before join
上面代码第一次运行没有调用doOnComplete - after join
,第二次调用了两次。第三次像第一次,第四次像第二次,依此类推...
3,4 都是 运行 使用此代码。可能与 subscribe {} 用法有关?请注意,我不拿着一次性用品。
此流结束是因为我对 'left' observable
进行了 GC
leftObservable.subscribeOn().observeOn()
.doOnComplete{log...}
.rightGroupJoin()
.doOnComplete{log...}
.subscribe {}
注意 1:在 mergeAllSingles()
之后添加 .takeUntil { thisCompleted }
似乎修复了 #4。
注意 2:在使用此方法加入多个流并应用 'Note1' 后,很明显 onComplete(在 groupJoin() 调用之前!!!)将被调用多次 'right' Observables,可能意味着原因是 right.takeUntil { thisCompleted }
,关闭 'right' 流真的很重要吗?
Note3:关于Note1,似乎和takeUntil vs. takeWhile有很大关系。使用 takeWhile 减少了 doOnComplete 调用,这在某种程度上是合乎逻辑的。仍在努力弄清楚。
- 你能想到 multiGroupJoin,或者在我们的例子中,multiRightGroupJoin,而不是 运行ning zip on groupJoin * rightObservablesCount?
想问什么就问什么。我知道我对 subscribe/disposable 和手册 onComplete 的使用不是这样的,我只是不确定什么是..
乍一看我会在这里使用 2 scan
s。示例:
data class Result(val left: Left?, val rightList: List<Right>) {
companion object {
val defaultInstance: Result = Result(null, listOf())
}
}
leftObservable.switchMap { left ->
rightObservable.scan(listOf()) {list, newRight -> list.plus(newRight)}
.map { rightsList -> Result(left, rightList) }
}
.scan(Pair(Result.defaultInstance, Result.defaultInstance)) { oldPair, newResult ->
Pair(oldPair.second, newResult)
}
.filter { it.first != it.second }
.map { it.first }
这里唯一的问题是处理onComplete
,不知道该怎么做
像这样简单的事情应该可行:
@JvmStatic
fun main(string: Array<String>) {
val left = PublishSubject.create<String>()
val right = PublishSubject.create<Int>()
left.flatMapSingle { s -> right.buffer(left).map { Pair(s, it) }.firstOrError() }
.subscribe{ println("Group : Letter : ${it.first}, Elements : ${it.second}") }
left.onNext("A")
right.onNext(1)
right.onNext(2)
right.onNext(3)
left.onNext("B")
right.onNext(4)
right.onNext(5)
left.onNext("C")
right.onNext(6)
left.onComplete()
}
输出:
Group : Letter : A, Elements : [1, 2, 3]
Group : Letter : B, Elements : [4, 5]
Group : Letter : C, Elements : [6]
您Observable
感兴趣的是左边,订阅吧。然后只需通过左侧可观察对象的下一次发射或完成来缓冲右侧。您只对每个上游左发射的单个结果感兴趣,因此只需使用 flatMapSingle
。我选择了 firstOrError()
但显然可以有一个默认项或其他错误处理甚至 flatMapMaybe
加上 firstElement()
编辑
OP 进行了进一步的问答,发现原来的问题和上面的解决方案是用前一个左发射缓冲右值,直到下一个左发射(如上所述),这不是必需的行为。新要求的行为是将右值缓冲到 NEXT 左发射,如下所示:
@JvmStatic
fun main(string: Array<String>) {
val left = PublishSubject.create<String>()
val right = PublishSubject.create<Int>()
left.zipWith (right.buffer(left),
BiFunction<String, List<Int>, Pair<String, List<Int>>> { t1, t2 -> Pair(t1, t2)
}).subscribe { println("Group : Letter : ${it.first}, Elements : ${it.second}") }
left.onNext("A")
right.onNext(1)
right.onNext(2)
right.onNext(3)
left.onNext("B")
right.onNext(4)
right.onNext(5)
left.onNext("C")
right.onNext(6)
left.onComplete()
}
这会产生不同的最终结果,因为左值与之前的右值压缩,直到下一个左发射(反向)。
输出:
Group : Letter : A, Elements : []
Group : Letter : B, Elements : [1, 2, 3]
Group : Letter : C, Elements : [4, 5]
折腾了几天,看似简单的任务,来找大家了:)
想法很简单。我有两个 streams/observables、'left' 和 'right'。
我想要 'right' 到 buffer/collect/aggregate 到 'current' 中的项目 'left'.
因此,'left' 中的每个项目定义一个新的 'window',而所有 'right' 项目将绑定到那个 window,直到发出新的 'left' 项目。因此,可视化:
任务:
'left' : |- A - - - - - B - - C - - - -|
'right' : |- 1 - 2 - 3 -4 - 5 - 6 - - -|
'result':|- - - - - - - -x - - -y - - - -z| (Pair<Left, List<Right>>
)
其中:A,1; B,4 (所以x); C (so y) 同时发出
所以: x = Pair(A, [1,2,3]), y = Pair(B, [4, 5])
并且: 'right' & 'result' complete/terminate 当 'left' 做
因此: z = Pair(C, [6]) - 作为 'left' 完成
----
编辑 2 - 最终解决方案!
为了将 'right' 项目与下一个 'left' 而不是前一个聚合,我将代码更改为这么多 shorter/simpler 一个:
fun <L, R> Observable<L>.rightGroupJoin(right: Observable<R>): Observable<Pair<L, List<R>>> {
return this.share().run {
zipWith(right.buffer(this), BiFunction { left, rightList ->
Pair(left, rightList)
})
}
}
编辑 1 - 初始解决方案!
摘自下面@Mark 的(接受的)答案,这是我想出的。
它被分成更小的方法,因为我也做 multiRightGroupJoin()
加入我想要的尽可能多的(正确的)流。
fun <T, R> Observable<T>.rightGroupJoin(right: Observable<R>): Observable<Pair<T, List<R>>> {
return this.share().let { thisObservable -> //use 'share' to avoid multi-subscription complications, e.g. multi calls to **preceding** doOnComplete
thisObservable.flatMapSingle { t -> //treat each 'left' as a Single
bufferRightOnSingleLeft(thisObservable, t, right)
}
}
}
其中:
private fun <T, R> bufferRightOnSingleLeft(left: Observable<*>, leftSingleItem: T, right: Observable<R>)
: Single<Pair<T, MutableList<R>>> {
return right.buffer(left) //buffer 'right' until 'left' onNext() (for each 'left' Single)
.map { Pair(leftSingleItem, it) }
.first(Pair(leftSingleItem, emptyList())) //should be only 1 (list). THINK firstOrError
}
----
到目前为止我得到了什么
在大量阅读和理解不知何故没有开箱即用的实现之后,我决定使用 groupJoin
,主要使用 this link,像这样:(许多问题和这里需要改进的地方,不要使用此代码)
private fun <T, R> Observable<T>.rightGroupJoin(right: Observable<R>): Observable<Pair<T, List<R>>> {
var thisCompleted = false //THINK is it possible to make the groupJoin complete on the left(this)'s onComplete automatically?
val thisObservable = this.doOnComplete { thisCompleted = true }
.share() //avoid weird side-effects of multiple onSubscribe calls
//join/attach 'right/other' stream to windows (buffers), starting and ending on each 'this/left' onNext
return thisObservable.groupJoin(
//bind 'right/other' stream to 'this/left'
right.takeUntil { thisCompleted }//have an onComplete rule THINK add share() at the end?
//define when windows start/end ('this/left' onNext opens new window and closes prev)
, Function<T, ObservableSource<T>> { thisObservable }
//define 'right/other' stream to have no windows/intervals/aggregations by itself
// -> immediately bind each emitted item to a 'current' window(T) above
, Function<R, ObservableSource<R>> { Observable.empty() }
//collect the whole 'right' stream in 'current' ('left') window
, BiFunction<T, Observable<R>, Single<Pair<T, List<R>>>> { t, rObs ->
rObs.collect({ mutableListOf<R>() }) { acc, value ->
acc.add(value)
}.map { Pair(t, it.toList()) }
}).mergeAllSingles()
}
我也使用类似的用法来创建一个 timedBuffer()
- 与 buffer(timeout)
相同,但每个缓冲区都有一个时间戳 (List
) 以了解它何时开始。基本上通过 运行 在 Observable.interval(timeout)
上使用相同的代码(作为 'left')
问题/问题(从最简单到最难)
- 这是做类似事情的最佳方式吗?是不是有点矫枉过正了?
- 当 'left' 完成时,是否有更好的方法(必须)来完成 'result'(和 'right')?没有这个丑陋的布尔逻辑?
这种用法似乎打乱了 rx 的顺序。请参阅下面的代码并打印:
leftObservable .doOnComplete { log("doOnComplete - before join") } .doOnComplete { log("doOnComplete 2 - before join") } .rightGroupJoin(rightObservable) .doOnComplete { log("doOnComplete - after join") }
打印(有时!看起来像竞争条件)以下内容:
doOnComplete - before join
doOnComplete - after join
doOnComplete 2 - before join
上面代码第一次运行没有调用
进行了 GCdoOnComplete - after join
,第二次调用了两次。第三次像第一次,第四次像第二次,依此类推...
3,4 都是 运行 使用此代码。可能与 subscribe {} 用法有关?请注意,我不拿着一次性用品。 此流结束是因为我对 'left' observableleftObservable.subscribeOn().observeOn() .doOnComplete{log...} .rightGroupJoin() .doOnComplete{log...} .subscribe {}
注意 1:在 mergeAllSingles()
之后添加 .takeUntil { thisCompleted }
似乎修复了 #4。
注意 2:在使用此方法加入多个流并应用 'Note1' 后,很明显 onComplete(在 groupJoin() 调用之前!!!)将被调用多次 'right' Observables,可能意味着原因是 right.takeUntil { thisCompleted }
,关闭 'right' 流真的很重要吗?
Note3:关于Note1,似乎和takeUntil vs. takeWhile有很大关系。使用 takeWhile 减少了 doOnComplete 调用,这在某种程度上是合乎逻辑的。仍在努力弄清楚。
- 你能想到 multiGroupJoin,或者在我们的例子中,multiRightGroupJoin,而不是 运行ning zip on groupJoin * rightObservablesCount?
想问什么就问什么。我知道我对 subscribe/disposable 和手册 onComplete 的使用不是这样的,我只是不确定什么是..
乍一看我会在这里使用 2 scan
s。示例:
data class Result(val left: Left?, val rightList: List<Right>) {
companion object {
val defaultInstance: Result = Result(null, listOf())
}
}
leftObservable.switchMap { left ->
rightObservable.scan(listOf()) {list, newRight -> list.plus(newRight)}
.map { rightsList -> Result(left, rightList) }
}
.scan(Pair(Result.defaultInstance, Result.defaultInstance)) { oldPair, newResult ->
Pair(oldPair.second, newResult)
}
.filter { it.first != it.second }
.map { it.first }
这里唯一的问题是处理onComplete
,不知道该怎么做
像这样简单的事情应该可行:
@JvmStatic
fun main(string: Array<String>) {
val left = PublishSubject.create<String>()
val right = PublishSubject.create<Int>()
left.flatMapSingle { s -> right.buffer(left).map { Pair(s, it) }.firstOrError() }
.subscribe{ println("Group : Letter : ${it.first}, Elements : ${it.second}") }
left.onNext("A")
right.onNext(1)
right.onNext(2)
right.onNext(3)
left.onNext("B")
right.onNext(4)
right.onNext(5)
left.onNext("C")
right.onNext(6)
left.onComplete()
}
输出:
Group : Letter : A, Elements : [1, 2, 3]
Group : Letter : B, Elements : [4, 5]
Group : Letter : C, Elements : [6]
您Observable
感兴趣的是左边,订阅吧。然后只需通过左侧可观察对象的下一次发射或完成来缓冲右侧。您只对每个上游左发射的单个结果感兴趣,因此只需使用 flatMapSingle
。我选择了 firstOrError()
但显然可以有一个默认项或其他错误处理甚至 flatMapMaybe
加上 firstElement()
编辑
OP 进行了进一步的问答,发现原来的问题和上面的解决方案是用前一个左发射缓冲右值,直到下一个左发射(如上所述),这不是必需的行为。新要求的行为是将右值缓冲到 NEXT 左发射,如下所示:
@JvmStatic
fun main(string: Array<String>) {
val left = PublishSubject.create<String>()
val right = PublishSubject.create<Int>()
left.zipWith (right.buffer(left),
BiFunction<String, List<Int>, Pair<String, List<Int>>> { t1, t2 -> Pair(t1, t2)
}).subscribe { println("Group : Letter : ${it.first}, Elements : ${it.second}") }
left.onNext("A")
right.onNext(1)
right.onNext(2)
right.onNext(3)
left.onNext("B")
right.onNext(4)
right.onNext(5)
left.onNext("C")
right.onNext(6)
left.onComplete()
}
这会产生不同的最终结果,因为左值与之前的右值压缩,直到下一个左发射(反向)。
输出:
Group : Letter : A, Elements : []
Group : Letter : B, Elements : [1, 2, 3]
Group : Letter : C, Elements : [4, 5]