RxSwift:保证上游单一订阅的 share() 替代方案

RxSwift: share() alternative that guarantees single subscription on the upstream

我一直认为 .share(replay: 1, scope: .forever) 共享单个上游订阅,无论有多少下游订阅者。

但是,我刚刚发现,如果下游订阅的计数降为零,它会停止“共享”并释放上游的订阅(因为 refCount() 在幕后使用)。所以当一个新的下游订阅发生时,它必须在上游重新订阅。在以下示例中:

let sut = Observable<Int>
    .create { promise in
        print("create")
        promise.onNext(0)
        return Disposables.create()
    }
    .share(replay: 1, scope: .forever)

sut.subscribe().dispose()
sut.subscribe().dispose()

我希望 create 只打印一次,但打印了两次。如果我删除 .dispose() 个调用 - 只需一次。

如何设置上游保证最多被订阅一次的链?

我不确定为什么 .share(replay: 1, scope: .forever) 没有给出您想要的行为(我也认为它应该像您描述的那样工作)但是没有 share 的其他方式呢?

// You will subscribe to this and not directly on sut (maybe hiding Subject interface to avoid onNext calls from observers)
let subject = ReplaySubject<Int>.create(bufferSize: 1)

let sut = Observable<Int>.create { obs in
  print("Performing work ...")
  obs.onNext(0)
  return Disposables.create()
}

// This subscription is hidden, happens only once and stays alive forever
sut.subscribe(subject)

// Observers subscribe to the public stream
subject.subscribe().dispose()
subject.subscribe().dispose()

您描述的目标暗示您应该使用 multicast(或使用它的运算符之一,如 publish()replay(_:)replayAll())而不是 share...

let sut = Observable<Int>
    .create { observer in
        print("create")
        observer.onNext(0)
        return Disposables.create()
    }
    .replay(1)

let disposable = sut.connect() // subscription will stay alive until dispose() is called on this disposable...

sut.debug("one").subscribe().dispose()
sut.debug("two").subscribe().dispose()

要了解 .forever 和 .whileConnected 之间的区别,请阅读“ShareReplayScope.swift”文件中的文档。两者都被重新计数,但区别在于 re-subscription 运算符的处理方式。这是一些显示差异的测试代码...

class SandboxTests: XCTestCase {
    var scheduler: TestScheduler!
    var observable: Observable<String>!

    override func setUp() {
        super.setUp()
        scheduler = TestScheduler(initialClock: 0)
        // creates an observable that will error on the first subscription, then call `.onNext("A")` on the second.
        observable = scheduler.createObservable(timeline: "-#-A")
    }

    func testWhileConnected() {
        // this shows that re-subscription gets through the while connected share to the source observable
        let result = scheduler.start { [observable] in
            observable!
                .share(scope: .whileConnected)
                .retry(2)
        }
        XCTAssertEqual(result.events, [
            .next(202, "A")
        ])
    }

    func testForever() {
        // however re-subscription doesn't get through on a forever share
        let result = scheduler.start { [observable] in
            observable!
                .share(scope: .forever)
                .retry(2)
        }
        XCTAssertEqual(result.events, [
            .error(201, NSError(domain: "Test Domain", code: -1))
        ])
    }
}