如何组成一个联合发布者,将最后发布的值发送给它的第二个订阅者

How to compose a combine publisher that emits the last-emitted value to its 2nd subscriber

我正在尝试构建一个发布者,当后续订阅者连接到它时,它会立即向该订阅者发出最后发出的值,然后继续向所有连接的订阅者发出未来的值。

问:有没有办法做到这一点无需从头编写发布器,而只需组合内置发布器?我可能在这里遗漏了一些东西。

所以,如果我们有一个发布商每秒都在计数:

let counter = Timer.publish(every: 1, on: .main, in: .common)
                   .autoconnect()
                   .scan(0, { v, _ in v + 1 })


let sharedPublisher = // ??? something with counter publisher above

sharedPublisher.sink { print("A: ", [=10=] }.store(in: &bag)

// after 2.5 seconds
sharedPublisher.sink { print("B: ", [=10=] }.store(in: &bag)

输出将是:

A: 1 // at t=1 sec
A: 2 // at t=2
B: 2 // at t=2.5
A: 3 // at t=3
B: 3 // at t=3

最初,我天真地以为我可以只使用 .share.buffer:

let sharedPublisher = counter
                  .share()
                  .buffer(size: 1, prefetch: .byRequest, whenFull: .dropOldest)

但当然,这不起作用,因为 buffer 仅在下游尚未准备好接受值时才进行缓冲,而此处并非如此。 也许 Record/Record.Recording?

可以共享的发布者(因为它是 class,而不是结构)并立即将其最新值交给任何新订阅者,其最新值是 CurrentValueSubject。所以,例如,这不是你在做什么,但它证明了这一点:

import UIKit
import Combine

func delay(_ delay:Double, closure:@escaping ()->()) {
    let when = DispatchTime.now() + delay
    DispatchQueue.main.asyncAfter(deadline: when, execute: closure)
}
class ViewController: UIViewController {
    var timer : Timer?
    let sub = CurrentValueSubject<Int,Never>(0)
    var storage = Set<AnyCancellable>()
    override func viewDidLoad() {
        super.viewDidLoad()
        self.timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true) { _ in
            self.sub.value += 1
        }
        delay(0.5) {
            self.sub.sink { print("A:", [=10=]) }.store(in: &self.storage)
        }
        delay(2.5) {
            self.sub.sink { print("B:", [=10=]) }.store(in: &self.storage)
        }
    }
}

输出

A: 0
A: 1
A: 2
B: 2
A: 3
B: 3

这似乎正是您想要的效果。

很酷的是,Subject 也可以是一个操作符(通过发布者的 .subscribe 方法),因此它可以存在于其他东西的下游,同时保持这种行为。唯一的技巧是您必须保留主题(您可以这样做,因为 .subscribe 调用会产生 AnyCancellable)。所以现在我可以你的方式,即从计时器发布者开始:

import UIKit
import Combine

func delay(_ delay:Double, closure:@escaping ()->()) {
    let when = DispatchTime.now() + delay
    DispatchQueue.main.asyncAfter(deadline: when, execute: closure)
}
class ViewController: UIViewController {
    var storage = Set<AnyCancellable>()
    override func viewDidLoad() {
        super.viewDidLoad()
        let counter = Timer.publish(every: 1, on: .main, in: .common)
                           .autoconnect()
                           .scan(0, { v, _ in v + 1 })
        let sub = CurrentValueSubject<Int,Never>(0)
        counter.subscribe(sub).store(in: &self.storage)
        delay(0.5) {
            sub.sink { print("A:", [=12=]) }.store(in: &self.storage)
        }
        delay(2.5) {
            sub.sink { print("B:", [=12=]) }.store(in: &self.storage)
        }
    }
}