RxSwift - 执行控制流导致异步操作被执行两次

RxSwift - doing control flow leads to async operation being performed twice

看看这个例子:

func query2() -> Observable<Int> {
    print("query2() called")
    return Observable.create { observer in
        print("creating query2() thread")
        let thread = Thread.init(block: {
            sleep(1)
            let numbers = [
                1,2,3,4,5
            ]
            for num in numbers {
                observer.onNext(num)
            }
            observer.onCompleted()
        })
        thread.start()
        return Disposables.create {
            thread.cancel()
        }
    }
}

let numbers = query2()
let even = numbers.filter { [=10=] % 2 == 0 }
let odd = numbers.filter { [=10=] % 2 != 0 }
let merged = even.concat(odd)

merged.subscribe(onNext: { n in
    print(n)
})

预期输出为:

query2() called
creating query() thread
2
4
1
3
5

但是,当需要从 odd.

中提取值时,线程似乎是第二次创建的

实际输出:

query2() called
creating query2() thread
2
4
creating query2() thread
1
3
5

我看了这段代码,心想 - 啊,我错过了 .share() 运算符,因为 evenodd 是从同一个流派生的。我最初没有添加它,因为我最终将它们合并到一个要订阅的流中 merged,并认为 Rx 会为我进行优化。

所以我使用了 share(): let numbers = query2().share()

输出仍然保持不变。

如何防止这种情况发生?

每次订阅结果 Observable 时都会调用传递给 Observable.create 的闭包。

share() 运算符有一个引用计数。当它收到一个订阅请求时它会订阅它的源,然后如果它收到另一个请求 而源是 运行ning 它也会将事件发送到第二个源。

令人惊讶的是,您的可观察对象在第一次订阅 returns 之前完成,因此没有什么可分享的。请注意,当您在后台线程中调用 onNext 时,它 立即 调用在该后台线程中传递给 subscribe 的闭包。当您调用 onCompleted 时,它会立即完成。它不会等待订阅退出来做这些事情。

此处的解决方案是使用多播运算符使您的 numbers observable 热。向它传递一个 ReplaySubject,它将存储输出并将其重播给任何后续订阅者。不要忘记调用 connect() 来使 Observable 运行 它的生成器函数。

像这样:

let numbers = Observable.deferred { () -> Observable<Int> in
    print("called")
    return Observable.from([1, 2, 3, 4, 5, 6])
}
.subscribe(on: ConcurrentDispatchQueueScheduler(qos: .default)) // this ensures that the block passed to `deferred` is called on a background thread.
.multicast(ReplaySubject<Int>.createUnbounded())

numbers.connect()

let even = numbers.filter { [=10=] % 2 == 0 }
let odd = numbers.filter { [=10=] % 2 != 0 }
let merged = even.concat(odd)

merged.subscribe(onNext: { print([=10=]) })