Combine - 合并多个共享过滤器
Combine - merging multiple shared filters
我已经使用 RxSwift 有一段时间了,刚刚切换到 Combine,我正试图围绕这个特定的 .filter
行为。这是一个简短的游乐场示例:
import Combine
let publisher = [1, 2, 3, 4, 5]
.publisher
.share()
let filter1 = publisher
.filter { [=10=] == 1 }
.print("filter1")
let filter2 = publisher
.filter { [=10=] == 2 }
.print("filter2")
Publishers
.Merge(filter1, filter2)
.sink {
print("Result is: \([=10=])")
}
输出是
filter1: receive subscription: (Multicast)
filter1: request unlimited
filter1: receive value: (1)
Result is: 1
filter1: receive finished
filter2: receive subscription: (Multicast)
filter2: request unlimited
filter2: receive finished
令我惊讶的是 Result is: 2
从未被调用,因为流已结束。我可以删除 .share()
运算符,这将导致收到我期望的两个值
filter1: receive subscription: ([1])
filter1: request unlimited
filter1: receive value: (1)
Result is: 1
filter1: receive finished
filter2: receive subscription: ([2])
filter2: request unlimited
filter2: receive value: (2)
Result is: 2
filter2: receive finished
但是,如果我的发布者是 API 调用并且我不想创建重复的网络请求怎么办?这正是我现在要处理的情况,这也是我需要使用 .share()
运算符的原因。
任何更好的解释为什么会发生这种情况以及如何处理您想要过滤流、在每个流中执行单独的逻辑然后将结果合并回一起的情况?
所以这里发生了一些不同的事情。
首先,[1, 2, 3].publisher
与Observable.from([1, 2, 3])
的工作方式不同。后者每个周期发出一次值,而前者连续发出所有值。 Publisher 示例在 Rx 中更像这样工作:
Observable<Int>.create { observer in
[1, 2, 3, 4, 5].forEach {
observer.onNext([=10=])
}
observer.onCompleted()
return Disposables.create()
}
因此,在 Observable.from
的情况下,排放 未 在订阅 filter2 observable 时完成。因此,即使您省略了 share()
,“结果是:1”和“结果是:2”也会被发出。
其次,share()
运算符的工作方式也不同。默认情况下,RxSwift share operator 将在所有订阅被处理后重置 Observable(这是一个引用计数共享)。在 Combine 的情况下,共享运算符使发布者可连接,然后连接到它。本质上,它与 RxSwift 中的 .share(replay: 0, scope: .forever)
运算符相同(我在 Rx BTW 中从来不需要的东西)。
所以相当于您发布的Combine代码的Rx代码实际上是这样的:
let observable = emitSequence([1, 2, 3, 4, 5])
.share(replay: 0, scope: .forever)
let filter1ʹ = observable
.filter { [=11=] == 1 }
.debug("filterʹ1")
let filter2ʹ = observable
.filter { [=11=] == 2 }
.debug("filterʹ2")
Observable.merge(filter1ʹ, filter2ʹ)
.subscribe(onNext: {
print("Resultʹ is: \([=11=])")
})
func emitSequence<S>(_ sequence: S) -> Observable<S.Element> where S: Sequence {
Observable.create { observer in
sequence.forEach {
observer.onNext([=11=])
}
observer.onCompleted()
return Disposables.create()
}
}
所有这些都表明处理 API 呼叫的实际方面很好。在那种情况下,假设调用不会立即 return (它至少需要一个周期)并且因为它是一次性的,只要你确保你没有重新订阅 Observable, share()
不重置不是问题。
我已经使用 RxSwift 有一段时间了,刚刚切换到 Combine,我正试图围绕这个特定的 .filter
行为。这是一个简短的游乐场示例:
import Combine
let publisher = [1, 2, 3, 4, 5]
.publisher
.share()
let filter1 = publisher
.filter { [=10=] == 1 }
.print("filter1")
let filter2 = publisher
.filter { [=10=] == 2 }
.print("filter2")
Publishers
.Merge(filter1, filter2)
.sink {
print("Result is: \([=10=])")
}
输出是
filter1: receive subscription: (Multicast)
filter1: request unlimited
filter1: receive value: (1)
Result is: 1
filter1: receive finished
filter2: receive subscription: (Multicast)
filter2: request unlimited
filter2: receive finished
令我惊讶的是 Result is: 2
从未被调用,因为流已结束。我可以删除 .share()
运算符,这将导致收到我期望的两个值
filter1: receive subscription: ([1])
filter1: request unlimited
filter1: receive value: (1)
Result is: 1
filter1: receive finished
filter2: receive subscription: ([2])
filter2: request unlimited
filter2: receive value: (2)
Result is: 2
filter2: receive finished
但是,如果我的发布者是 API 调用并且我不想创建重复的网络请求怎么办?这正是我现在要处理的情况,这也是我需要使用 .share()
运算符的原因。
任何更好的解释为什么会发生这种情况以及如何处理您想要过滤流、在每个流中执行单独的逻辑然后将结果合并回一起的情况?
所以这里发生了一些不同的事情。
首先,[1, 2, 3].publisher
与Observable.from([1, 2, 3])
的工作方式不同。后者每个周期发出一次值,而前者连续发出所有值。 Publisher 示例在 Rx 中更像这样工作:
Observable<Int>.create { observer in
[1, 2, 3, 4, 5].forEach {
observer.onNext([=10=])
}
observer.onCompleted()
return Disposables.create()
}
因此,在 Observable.from
的情况下,排放 未 在订阅 filter2 observable 时完成。因此,即使您省略了 share()
,“结果是:1”和“结果是:2”也会被发出。
其次,share()
运算符的工作方式也不同。默认情况下,RxSwift share operator 将在所有订阅被处理后重置 Observable(这是一个引用计数共享)。在 Combine 的情况下,共享运算符使发布者可连接,然后连接到它。本质上,它与 RxSwift 中的 .share(replay: 0, scope: .forever)
运算符相同(我在 Rx BTW 中从来不需要的东西)。
所以相当于您发布的Combine代码的Rx代码实际上是这样的:
let observable = emitSequence([1, 2, 3, 4, 5])
.share(replay: 0, scope: .forever)
let filter1ʹ = observable
.filter { [=11=] == 1 }
.debug("filterʹ1")
let filter2ʹ = observable
.filter { [=11=] == 2 }
.debug("filterʹ2")
Observable.merge(filter1ʹ, filter2ʹ)
.subscribe(onNext: {
print("Resultʹ is: \([=11=])")
})
func emitSequence<S>(_ sequence: S) -> Observable<S.Element> where S: Sequence {
Observable.create { observer in
sequence.forEach {
observer.onNext([=11=])
}
observer.onCompleted()
return Disposables.create()
}
}
所有这些都表明处理 API 呼叫的实际方面很好。在那种情况下,假设调用不会立即 return (它至少需要一个周期)并且因为它是一次性的,只要你确保你没有重新订阅 Observable, share()
不重置不是问题。