RxSwift:保证上游单一订阅的 share() 替代方案
RxSwift: share() alternative that guarantees single subscription on the upstream
我一直认为 .share(replay: 1, scope: .forever)
共享单个上游订阅,无论有多少下游订阅者。
但是,我刚刚发现,如果下游订阅的计数降为零,它会停止“共享”并释放上游的订阅(因为 refCount()
在幕后使用)。所以当一个新的下游订阅发生时,它必须在上游重新订阅。在以下示例中:
let sut = Observable<Int>
.create { promise in
print("create")
promise.onNext(0)
return Disposables.create()
}
.share(replay: 1, scope: .forever)
sut.subscribe().dispose()
sut.subscribe().dispose()
我希望 create
只打印一次,但打印了两次。如果我删除 .dispose()
个调用 - 只需一次。
如何设置上游保证最多被订阅一次的链?
我不确定为什么 .share(replay: 1, scope: .forever)
没有给出您想要的行为(我也认为它应该像您描述的那样工作)但是没有 share
的其他方式呢?
// You will subscribe to this and not directly on sut (maybe hiding Subject interface to avoid onNext calls from observers)
let subject = ReplaySubject<Int>.create(bufferSize: 1)
let sut = Observable<Int>.create { obs in
print("Performing work ...")
obs.onNext(0)
return Disposables.create()
}
// This subscription is hidden, happens only once and stays alive forever
sut.subscribe(subject)
// Observers subscribe to the public stream
subject.subscribe().dispose()
subject.subscribe().dispose()
您描述的目标暗示您应该使用 multicast
(或使用它的运算符之一,如 publish()
、replay(_:)
或 replayAll()
)而不是 share
...
let sut = Observable<Int>
.create { observer in
print("create")
observer.onNext(0)
return Disposables.create()
}
.replay(1)
let disposable = sut.connect() // subscription will stay alive until dispose() is called on this disposable...
sut.debug("one").subscribe().dispose()
sut.debug("two").subscribe().dispose()
要了解 .forever 和 .whileConnected 之间的区别,请阅读“ShareReplayScope.swift”文件中的文档。两者都被重新计数,但区别在于 re-subscription 运算符的处理方式。这是一些显示差异的测试代码...
class SandboxTests: XCTestCase {
var scheduler: TestScheduler!
var observable: Observable<String>!
override func setUp() {
super.setUp()
scheduler = TestScheduler(initialClock: 0)
// creates an observable that will error on the first subscription, then call `.onNext("A")` on the second.
observable = scheduler.createObservable(timeline: "-#-A")
}
func testWhileConnected() {
// this shows that re-subscription gets through the while connected share to the source observable
let result = scheduler.start { [observable] in
observable!
.share(scope: .whileConnected)
.retry(2)
}
XCTAssertEqual(result.events, [
.next(202, "A")
])
}
func testForever() {
// however re-subscription doesn't get through on a forever share
let result = scheduler.start { [observable] in
observable!
.share(scope: .forever)
.retry(2)
}
XCTAssertEqual(result.events, [
.error(201, NSError(domain: "Test Domain", code: -1))
])
}
}
我一直认为 .share(replay: 1, scope: .forever)
共享单个上游订阅,无论有多少下游订阅者。
但是,我刚刚发现,如果下游订阅的计数降为零,它会停止“共享”并释放上游的订阅(因为 refCount()
在幕后使用)。所以当一个新的下游订阅发生时,它必须在上游重新订阅。在以下示例中:
let sut = Observable<Int>
.create { promise in
print("create")
promise.onNext(0)
return Disposables.create()
}
.share(replay: 1, scope: .forever)
sut.subscribe().dispose()
sut.subscribe().dispose()
我希望 create
只打印一次,但打印了两次。如果我删除 .dispose()
个调用 - 只需一次。
如何设置上游保证最多被订阅一次的链?
我不确定为什么 .share(replay: 1, scope: .forever)
没有给出您想要的行为(我也认为它应该像您描述的那样工作)但是没有 share
的其他方式呢?
// You will subscribe to this and not directly on sut (maybe hiding Subject interface to avoid onNext calls from observers)
let subject = ReplaySubject<Int>.create(bufferSize: 1)
let sut = Observable<Int>.create { obs in
print("Performing work ...")
obs.onNext(0)
return Disposables.create()
}
// This subscription is hidden, happens only once and stays alive forever
sut.subscribe(subject)
// Observers subscribe to the public stream
subject.subscribe().dispose()
subject.subscribe().dispose()
您描述的目标暗示您应该使用 multicast
(或使用它的运算符之一,如 publish()
、replay(_:)
或 replayAll()
)而不是 share
...
let sut = Observable<Int>
.create { observer in
print("create")
observer.onNext(0)
return Disposables.create()
}
.replay(1)
let disposable = sut.connect() // subscription will stay alive until dispose() is called on this disposable...
sut.debug("one").subscribe().dispose()
sut.debug("two").subscribe().dispose()
要了解 .forever 和 .whileConnected 之间的区别,请阅读“ShareReplayScope.swift”文件中的文档。两者都被重新计数,但区别在于 re-subscription 运算符的处理方式。这是一些显示差异的测试代码...
class SandboxTests: XCTestCase {
var scheduler: TestScheduler!
var observable: Observable<String>!
override func setUp() {
super.setUp()
scheduler = TestScheduler(initialClock: 0)
// creates an observable that will error on the first subscription, then call `.onNext("A")` on the second.
observable = scheduler.createObservable(timeline: "-#-A")
}
func testWhileConnected() {
// this shows that re-subscription gets through the while connected share to the source observable
let result = scheduler.start { [observable] in
observable!
.share(scope: .whileConnected)
.retry(2)
}
XCTAssertEqual(result.events, [
.next(202, "A")
])
}
func testForever() {
// however re-subscription doesn't get through on a forever share
let result = scheduler.start { [observable] in
observable!
.share(scope: .forever)
.retry(2)
}
XCTAssertEqual(result.events, [
.error(201, NSError(domain: "Test Domain", code: -1))
])
}
}