Swift Combine:检查Subject是否有观察者?
Swift Combine: Check if Subject has observer?
在 RxSwift 中,我们可以使用 hasObserver
检查 *Subject
是否有任何观察者,我如何在 Combine 中执行此操作,例如一个 PassthroughSubject
?
没有一次需要这个...Apple 没有通过 API 提供这个,实际上,我不推荐这样的东西,因为它就像手动检查 retainCount
的值预 ARC Objective-C 用于代码中的某些决定。
总之是有可能的。让我们将其视为实验室练习。希望有人觉得这有帮助。
免责声明:以下代码未针对所有发布者进行测试,并且对于某些真实世界的项目而言并不安全。这只是方法演示。
因此,由于有多种类型的发布者,而且所有发布者都是最终的和私有的,而且可能来自类型橡皮擦,我们需要适用于任何发布者的通用事物,因此运算符
extension Publisher {
public func countingSubscribers(_ callback: ((Int) -> Void)? = nil)
-> Publishers.SubscribersCounter<Self> {
return Publishers.SubscribersCounter<Self>(upstream: self, callback: callback)
}
}
Operator 使我们有可能在发布者链的任何地方注入并通过回调提供有趣的价值。在我们的例子中,有趣的值将是订阅者的数量。
由于在上游和下游都注入了运算符,我们需要双向自定义管道实现,即。自定义发布者、自定义订阅者、自定义订阅。在我们的例子中,它们必须是透明的,因为我们不需要修改流...实际上它将是 Combine-proxy。
可能的用法:
1)当SubscribersCounter
publisher在链中最后时,numberOfSubscribers
属性可以直接使用
let publisher = NotificationCenter.default
.publisher(for: UIApplication.didBecomeActiveNotification)
.countingSubscribers()
...
publisher.numberOfSubscribers
2) 当它在链的中间某处时,然后接收关于更改的订阅者计数的回调
let publisher = URLSession.shared
.dataTaskPublisher(for: URL(string: "https://www.google.com")!)
.countingSubscribers({ count in print("Observers: \(count)") })
.receive(on: DispatchQueue.main)
.map { _ in "Data received" }
.replaceError(with: "An error occurred")
实现如下:
import Combine
extension Publishers {
public class SubscribersCounter<Upstream> : Publisher where Upstream : Publisher {
private(set) var numberOfSubscribers = 0
public typealias Output = Upstream.Output
public typealias Failure = Upstream.Failure
public let upstream: Upstream
public let callback: ((Int) -> Void)?
public init(upstream: Upstream, callback: ((Int) -> Void)?) {
self.upstream = upstream
self.callback = callback
}
public func receive<S>(subscriber: S) where S : Subscriber,
Upstream.Failure == S.Failure, Upstream.Output == S.Input {
self.increase()
upstream.receive(subscriber: SubscribersCounterSubscriber<S>(counter: self, subscriber: subscriber))
}
fileprivate func increase() {
numberOfSubscribers += 1
self.callback?(numberOfSubscribers)
}
fileprivate func decrease() {
numberOfSubscribers -= 1
self.callback?(numberOfSubscribers)
}
// own subscriber is needed to intercept upstream/downstream events
private class SubscribersCounterSubscriber<S> : Subscriber where S: Subscriber {
let counter: SubscribersCounter<Upstream>
let subscriber: S
init (counter: SubscribersCounter<Upstream>, subscriber: S) {
self.counter = counter
self.subscriber = subscriber
}
deinit {
Swift.print(">> Subscriber deinit")
}
func receive(subscription: Subscription) {
subscriber.receive(subscription: SubscribersCounterSubscription<Upstream>(counter: counter, subscription: subscription))
}
func receive(_ input: S.Input) -> Subscribers.Demand {
return subscriber.receive(input)
}
func receive(completion: Subscribers.Completion<S.Failure>) {
subscriber.receive(completion: completion)
}
typealias Input = S.Input
typealias Failure = S.Failure
}
// own subcription is needed to handle cancel and decrease
private class SubscribersCounterSubscription<Upstream>: Subscription where Upstream: Publisher {
let counter: SubscribersCounter<Upstream>
let wrapped: Subscription
private var cancelled = false
init(counter: SubscribersCounter<Upstream>, subscription: Subscription) {
self.counter = counter
self.wrapped = subscription
}
deinit {
Swift.print(">> Subscription deinit")
if !cancelled {
counter.decrease()
}
}
func request(_ demand: Subscribers.Demand) {
wrapped.request(demand)
}
func cancel() {
wrapped.cancel()
if !cancelled {
cancelled = true
counter.decrease()
}
}
}
}
}
在发布我的问题一段时间后,我写了这个简单的扩展。比@Asperi 的解决方案简单得多。除了简单性(我的)之外,不确定这两种解决方案之间的 disadvantages/advantages。
private enum CounterChange: Int, Equatable {
case increased = 1
case decreased = -1
}
extension Publisher {
func trackNumberOfSubscribers(
_ notifyChange: @escaping (Int) -> Void
) -> AnyPublisher<Output, Failure> {
var counter = NSNumber.init(value: 0)
let nsLock = NSLock()
func updateCounter(_ change: CounterChange, notify: (Int) -> Void) {
nsLock.lock()
counter = NSNumber(value: counter.intValue + change.rawValue)
notify(counter.intValue)
nsLock.unlock()
}
return handleEvents(
receiveSubscription: { _ in updateCounter(.increased, notify: notifyChange) },
receiveCompletion: { _ in updateCounter(.decreased, notify: notifyChange) },
receiveCancel: { updateCounter(.decreased, notify: notifyChange) }
).eraseToAnyPublisher()
}
}
这里有一些测试:
import XCTest
import Combine
final class PublisherTrackNumberOfSubscribersTest: TestCase {
func test_four_subscribers_complete_by_finish() {
doTest { publisher in
publisher.send(completion: .finished)
}
}
func test_four_subscribers_complete_by_error() {
doTest { publisher in
publisher.send(completion: .failure(.init()))
}
}
}
private extension PublisherTrackNumberOfSubscribersTest {
struct EmptyError: Swift.Error {}
func doTest(_ line: UInt = #line, complete: (PassthroughSubject<Int, EmptyError>) -> Void) {
let publisher = PassthroughSubject<Int, EmptyError>()
var numberOfSubscriptions = [Int]()
let trackable = publisher.trackNumberOfSubscribers { counter in
numberOfSubscriptions.append(counter)
}
func subscribe() -> Cancellable {
return trackable.sink(receiveCompletion: { _ in }, receiveValue: { _ in })
}
let cancellable1 = subscribe()
let cancellable2 = subscribe()
let cancellable3 = subscribe()
let cancellable4 = subscribe()
XCTAssertNotNil(cancellable1, line: line)
XCTAssertNotNil(cancellable2, line: line)
XCTAssertNotNil(cancellable3, line: line)
XCTAssertNotNil(cancellable4, line: line)
cancellable1.cancel()
cancellable2.cancel()
complete(publisher)
XCTAssertEqual(numberOfSubscriptions, [1, 2, 3, 4, 3, 2, 1, 0], line: line)
}
}
在 RxSwift 中,我们可以使用 hasObserver
检查 *Subject
是否有任何观察者,我如何在 Combine 中执行此操作,例如一个 PassthroughSubject
?
没有一次需要这个...Apple 没有通过 API 提供这个,实际上,我不推荐这样的东西,因为它就像手动检查 retainCount
的值预 ARC Objective-C 用于代码中的某些决定。
总之是有可能的。让我们将其视为实验室练习。希望有人觉得这有帮助。
免责声明:以下代码未针对所有发布者进行测试,并且对于某些真实世界的项目而言并不安全。这只是方法演示。
因此,由于有多种类型的发布者,而且所有发布者都是最终的和私有的,而且可能来自类型橡皮擦,我们需要适用于任何发布者的通用事物,因此运算符
extension Publisher {
public func countingSubscribers(_ callback: ((Int) -> Void)? = nil)
-> Publishers.SubscribersCounter<Self> {
return Publishers.SubscribersCounter<Self>(upstream: self, callback: callback)
}
}
Operator 使我们有可能在发布者链的任何地方注入并通过回调提供有趣的价值。在我们的例子中,有趣的值将是订阅者的数量。
由于在上游和下游都注入了运算符,我们需要双向自定义管道实现,即。自定义发布者、自定义订阅者、自定义订阅。在我们的例子中,它们必须是透明的,因为我们不需要修改流...实际上它将是 Combine-proxy。
可能的用法:
1)当SubscribersCounter
publisher在链中最后时,numberOfSubscribers
属性可以直接使用
let publisher = NotificationCenter.default
.publisher(for: UIApplication.didBecomeActiveNotification)
.countingSubscribers()
...
publisher.numberOfSubscribers
2) 当它在链的中间某处时,然后接收关于更改的订阅者计数的回调
let publisher = URLSession.shared
.dataTaskPublisher(for: URL(string: "https://www.google.com")!)
.countingSubscribers({ count in print("Observers: \(count)") })
.receive(on: DispatchQueue.main)
.map { _ in "Data received" }
.replaceError(with: "An error occurred")
实现如下:
import Combine
extension Publishers {
public class SubscribersCounter<Upstream> : Publisher where Upstream : Publisher {
private(set) var numberOfSubscribers = 0
public typealias Output = Upstream.Output
public typealias Failure = Upstream.Failure
public let upstream: Upstream
public let callback: ((Int) -> Void)?
public init(upstream: Upstream, callback: ((Int) -> Void)?) {
self.upstream = upstream
self.callback = callback
}
public func receive<S>(subscriber: S) where S : Subscriber,
Upstream.Failure == S.Failure, Upstream.Output == S.Input {
self.increase()
upstream.receive(subscriber: SubscribersCounterSubscriber<S>(counter: self, subscriber: subscriber))
}
fileprivate func increase() {
numberOfSubscribers += 1
self.callback?(numberOfSubscribers)
}
fileprivate func decrease() {
numberOfSubscribers -= 1
self.callback?(numberOfSubscribers)
}
// own subscriber is needed to intercept upstream/downstream events
private class SubscribersCounterSubscriber<S> : Subscriber where S: Subscriber {
let counter: SubscribersCounter<Upstream>
let subscriber: S
init (counter: SubscribersCounter<Upstream>, subscriber: S) {
self.counter = counter
self.subscriber = subscriber
}
deinit {
Swift.print(">> Subscriber deinit")
}
func receive(subscription: Subscription) {
subscriber.receive(subscription: SubscribersCounterSubscription<Upstream>(counter: counter, subscription: subscription))
}
func receive(_ input: S.Input) -> Subscribers.Demand {
return subscriber.receive(input)
}
func receive(completion: Subscribers.Completion<S.Failure>) {
subscriber.receive(completion: completion)
}
typealias Input = S.Input
typealias Failure = S.Failure
}
// own subcription is needed to handle cancel and decrease
private class SubscribersCounterSubscription<Upstream>: Subscription where Upstream: Publisher {
let counter: SubscribersCounter<Upstream>
let wrapped: Subscription
private var cancelled = false
init(counter: SubscribersCounter<Upstream>, subscription: Subscription) {
self.counter = counter
self.wrapped = subscription
}
deinit {
Swift.print(">> Subscription deinit")
if !cancelled {
counter.decrease()
}
}
func request(_ demand: Subscribers.Demand) {
wrapped.request(demand)
}
func cancel() {
wrapped.cancel()
if !cancelled {
cancelled = true
counter.decrease()
}
}
}
}
}
在发布我的问题一段时间后,我写了这个简单的扩展。比@Asperi 的解决方案简单得多。除了简单性(我的)之外,不确定这两种解决方案之间的 disadvantages/advantages。
private enum CounterChange: Int, Equatable {
case increased = 1
case decreased = -1
}
extension Publisher {
func trackNumberOfSubscribers(
_ notifyChange: @escaping (Int) -> Void
) -> AnyPublisher<Output, Failure> {
var counter = NSNumber.init(value: 0)
let nsLock = NSLock()
func updateCounter(_ change: CounterChange, notify: (Int) -> Void) {
nsLock.lock()
counter = NSNumber(value: counter.intValue + change.rawValue)
notify(counter.intValue)
nsLock.unlock()
}
return handleEvents(
receiveSubscription: { _ in updateCounter(.increased, notify: notifyChange) },
receiveCompletion: { _ in updateCounter(.decreased, notify: notifyChange) },
receiveCancel: { updateCounter(.decreased, notify: notifyChange) }
).eraseToAnyPublisher()
}
}
这里有一些测试:
import XCTest
import Combine
final class PublisherTrackNumberOfSubscribersTest: TestCase {
func test_four_subscribers_complete_by_finish() {
doTest { publisher in
publisher.send(completion: .finished)
}
}
func test_four_subscribers_complete_by_error() {
doTest { publisher in
publisher.send(completion: .failure(.init()))
}
}
}
private extension PublisherTrackNumberOfSubscribersTest {
struct EmptyError: Swift.Error {}
func doTest(_ line: UInt = #line, complete: (PassthroughSubject<Int, EmptyError>) -> Void) {
let publisher = PassthroughSubject<Int, EmptyError>()
var numberOfSubscriptions = [Int]()
let trackable = publisher.trackNumberOfSubscribers { counter in
numberOfSubscriptions.append(counter)
}
func subscribe() -> Cancellable {
return trackable.sink(receiveCompletion: { _ in }, receiveValue: { _ in })
}
let cancellable1 = subscribe()
let cancellable2 = subscribe()
let cancellable3 = subscribe()
let cancellable4 = subscribe()
XCTAssertNotNil(cancellable1, line: line)
XCTAssertNotNil(cancellable2, line: line)
XCTAssertNotNil(cancellable3, line: line)
XCTAssertNotNil(cancellable4, line: line)
cancellable1.cancel()
cancellable2.cancel()
complete(publisher)
XCTAssertEqual(numberOfSubscriptions, [1, 2, 3, 4, 3, 2, 1, 0], line: line)
}
}