Combine - 合并多个共享过滤器

Combine - merging multiple shared filters

我已经使用 RxSwift 有一段时间了,刚刚切换到 Combine,我正试图围绕这个特定的 .filter 行为。这是一个简短的游乐场示例:

import Combine

let publisher = [1, 2, 3, 4, 5]
    .publisher
    .share()

let filter1 = publisher
    .filter { [=10=] == 1 }
    .print("filter1")

let filter2 = publisher
    .filter { [=10=] == 2 }
    .print("filter2")

Publishers
    .Merge(filter1, filter2)
    .sink {
        print("Result is: \([=10=])")
    }

输出是

filter1: receive subscription: (Multicast)
filter1: request unlimited
filter1: receive value: (1)
Result is: 1
filter1: receive finished
filter2: receive subscription: (Multicast)
filter2: request unlimited
filter2: receive finished

令我惊讶的是 Result is: 2 从未被调用,因为流已结束。我可以删除 .share() 运算符,这将导致收到我期望的两个值

filter1: receive subscription: ([1])
filter1: request unlimited
filter1: receive value: (1)
Result is: 1
filter1: receive finished
filter2: receive subscription: ([2])
filter2: request unlimited
filter2: receive value: (2)
Result is: 2
filter2: receive finished

但是,如果我的发布者是 API 调用并且我不想创建重复的网络请求怎么办?这正是我现在要处理的情况,这也是我需要使用 .share() 运算符的原因。

任何更好的解释为什么会发生这种情况以及如何处理您想要过滤流、在每个流中执行单独的逻辑然后将结果合并回一起的情况?

所以这里发生了一些不同的事情。

首先[1, 2, 3].publisherObservable.from([1, 2, 3]) 的工作方式不同。后者每个周期发出一次值,而前者连续发出所有值。 Publisher 示例在 Rx 中更像这样工作:

Observable<Int>.create { observer in
    [1, 2, 3, 4, 5].forEach {
        observer.onNext([=10=])
    }
    observer.onCompleted()
    return Disposables.create()
}

因此,在 Observable.from 的情况下,排放 在订阅 filter2 observable 时完成。因此,即使您省略了 share(),“结果是:1”和“结果是:2”也会被发出。

其次share()运算符的工作方式也不同。默认情况下,RxSwift share operator 将在所有订阅被处理后重置 Observable(这是一个引用计数共享)。在 Combine 的情况下,共享运算符使发布者可连接,然后连接到它。本质上,它与 RxSwift 中的 .share(replay: 0, scope: .forever) 运算符相同(我在 Rx BTW 中从来不需要的东西)。

所以相当于您发布的Combine代码的Rx代码实际上是这样的:

let observable = emitSequence([1, 2, 3, 4, 5])
    .share(replay: 0, scope: .forever)

let filter1ʹ = observable
    .filter { [=11=] == 1 }
    .debug("filterʹ1")

let filter2ʹ = observable
    .filter { [=11=] == 2 }
    .debug("filterʹ2")

Observable.merge(filter1ʹ, filter2ʹ)
    .subscribe(onNext: {
        print("Resultʹ is: \([=11=])")
    })

func emitSequence<S>(_ sequence: S) -> Observable<S.Element> where S: Sequence {
    Observable.create { observer in
        sequence.forEach {
            observer.onNext([=11=])
        }
        observer.onCompleted()
        return Disposables.create()
    }
}

所有这些都表明处理 API 呼叫的实际方面很好。在那种情况下,假设调用不会立即 return (它至少需要一个周期)并且因为它是一次性的,只要你确保你没有重新订阅 Observable, share() 不重置不是问题。