Combine:对于自定义发布者来说,这是一个很好的技术吗?

Combine: Is this a good technique for a custom publisher?

我正在尝试编写自定义 Combine 发布器,以便更好地了解如何将各种 classes 转换为它们。不可否认,这不是我想做的很多事情,我只是想了解如果需要的话可以怎么做。

我正在使用的场景是我有一个 class 随着时间的推移生成值并且可能有多个订阅者在监听。这不是发布者在请求时生成值,而是在需要时推送值的情况。这可能会发生(例如)在阅读文本时,或者从 UI.

随机输入时

为了测试这一点,我从一个简单的整数生成器开始,它是这样的:

class IntPublisher {
                
    func generate() {
        DispatchQueue.global(qos: .background).async { [weak self] in
            self?.send(0)
            self?.send(1)
            self?.send(2)
            self?.complete()
        }
    }
        
    private func send(_ value: Int) {
        queueOnMain()
    }
        
    func queueOnMain() {
        Thread.sleep(forTimeInterval: 0.5)
        DispatchQueue.main.async { /* ... */ }
    }
}

这里的生成器是 PublisherSubscription:

class IntPublisher: Publisher {
    
    typealias Output = Int
    typealias Failure = Never
    
    class Subscription: Combine.Subscription, Equatable {
        
        private var subscriber: AnySubscriber<Int, Never>?
        private var didFinish: ((Subscription) -> Void)?
        
        init<S>(subscriber: S, didFinish:@escaping (Subscription) -> Void) where S: Subscriber, S.Input == Output, S.Failure == Failure {
            self.subscriber = AnySubscriber(subscriber)
            self.didFinish = didFinish
        }
        
        func request(_ demand: Subscribers.Demand) {
        }
        
        func cancel() {
            finish()
        }
        
        func complete() {
            self.subscriber?.receive(completion: .finished)
            finish()
        }
        
        func finish() {
            didFinish?(self)
            subscriber = nil
            didFinish = nil
        }
        
        func send(_ value: Int) {
            _ = subscriber?.receive(value)
        }
        
        static func == (lhs: PublisherTests.IntPublisher.Subscription, rhs: PublisherTests.IntPublisher.Subscription) -> Bool {
            return lhs.subscriber?.combineIdentifier == rhs.subscriber?.combineIdentifier
        }
    }
    
    var subscriptions = [Subscription]()
    
    func receive<S>(subscriber: S) where S : Subscriber, S.Failure == Failure, S.Input == Output {
        
        let subscription = Subscription(subscriber: subscriber) { [weak self] (subscription) in
            self?.subscriptions.remove(subscription)
        }
        
        subscriptions.append(subscription)
        subscriber.receive(subscription: subscription)
    }
    
    func generate() {
        DispatchQueue.global(qos: .background).async { [weak self] in
            self?.send(0)
            self?.send(1)
            self?.send(2)
            self?.complete()
        }
    }
    
    private func send(_ value: Int) {
        queueOnMain { [=11=].send(value) }
    }
    
    private func complete() {
        queueOnMain { [=11=].complete() }
    }
    
    func queueOnMain(_ block: @escaping (Subscription) -> Void) {
        Thread.sleep(forTimeInterval: 0.5)
        DispatchQueue.main.async { self.subscriptions.forEach { block([=11=]) } }
    }
}

我的问题围绕着我必须在发布者中跟踪订阅的方式。因为它正在生成值并需要将它们转发给订阅,所以我必须设置一个数组并将订阅存储在其中。反过来,我必须找到一种方法让订阅在取消或完成时从发布者的数组中删除自己,因为数组有效形成了发布者和订阅之间的循环引用。

在我阅读的所有关于自定义发布的博客中,它们都涵盖了发布者等待订阅者请求值的场景。发布者不需要存储对订阅的引用,因为它传递闭包,他们可以调用闭包来获取值。我的用例不同,因为发布者控制请求,而不是订阅者。

所以我的问题是 - 使用数组是处理这个问题的好方法吗?还是我错过了 Combine 中的某些内容?

New Dev 的想法大大减少了代码。我不知道为什么我没有想到它……哦等等,我想到了。我太专注于实现我完全忘记了考虑围绕主题使用装饰器模式的选项。

无论如何,这是(更简单的)代码:

class IntPublisher2: Publisher {
    
    typealias Output = Int
    typealias Failure = Never
    private let passThroughSubject = PassthroughSubject<Output, Failure>()
            
    func receive<S>(subscriber: S) where S : Subscriber, S.Failure == Failure, S.Input == Output {
        passThroughSubject.receive(subscriber: subscriber)
    }
    
    func generate() {
        DispatchQueue.global(qos: .background).async {
            Thread.sleep(forTimeInterval: 0.5)
            self.passThroughSubject.send(0)
            Thread.sleep(forTimeInterval: 0.5)
            self.passThroughSubject.send(1)
            Thread.sleep(forTimeInterval: 0.5)
            self.passThroughSubject.send(2)
            Thread.sleep(forTimeInterval: 0.5)
            self.passThroughSubject.send(completion: .finished)
        }
    }
}

Apple 建议 Creating Your Own Publishers。您应该使用 SubjectCurrentValueSubject@Published

的具体子类

例如:

func operation() -> AnyPublisher<String, Error> {
    let subject = PassthroughSubject<String, Error>()
    subject.send("A")
    subject.send("B")
    subject.send("C")
    return subject.eraseToAnyPublisher()
}