如何组成一个联合发布者,将最后发布的值发送给它的第二个订阅者
How to compose a combine publisher that emits the last-emitted value to its 2nd subscriber
我正在尝试构建一个发布者,当后续订阅者连接到它时,它会立即向该订阅者发出最后发出的值,然后继续向所有连接的订阅者发出未来的值。
问:有没有办法做到这一点无需从头编写发布器,而只需组合内置发布器?我可能在这里遗漏了一些东西。
所以,如果我们有一个发布商每秒都在计数:
let counter = Timer.publish(every: 1, on: .main, in: .common)
.autoconnect()
.scan(0, { v, _ in v + 1 })
let sharedPublisher = // ??? something with counter publisher above
sharedPublisher.sink { print("A: ", [=10=] }.store(in: &bag)
// after 2.5 seconds
sharedPublisher.sink { print("B: ", [=10=] }.store(in: &bag)
输出将是:
A: 1 // at t=1 sec
A: 2 // at t=2
B: 2 // at t=2.5
A: 3 // at t=3
B: 3 // at t=3
最初,我天真地以为我可以只使用 .share
和 .buffer
:
let sharedPublisher = counter
.share()
.buffer(size: 1, prefetch: .byRequest, whenFull: .dropOldest)
但当然,这不起作用,因为 buffer
仅在下游尚未准备好接受值时才进行缓冲,而此处并非如此。
也许 Record
/Record.Recording
?
可以共享的发布者(因为它是 class,而不是结构)并立即将其最新值交给任何新订阅者,其最新值是 CurrentValueSubject。所以,例如,这不是你在做什么,但它证明了这一点:
import UIKit
import Combine
func delay(_ delay:Double, closure:@escaping ()->()) {
let when = DispatchTime.now() + delay
DispatchQueue.main.asyncAfter(deadline: when, execute: closure)
}
class ViewController: UIViewController {
var timer : Timer?
let sub = CurrentValueSubject<Int,Never>(0)
var storage = Set<AnyCancellable>()
override func viewDidLoad() {
super.viewDidLoad()
self.timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true) { _ in
self.sub.value += 1
}
delay(0.5) {
self.sub.sink { print("A:", [=10=]) }.store(in: &self.storage)
}
delay(2.5) {
self.sub.sink { print("B:", [=10=]) }.store(in: &self.storage)
}
}
}
输出
A: 0
A: 1
A: 2
B: 2
A: 3
B: 3
这似乎正是您想要的效果。
很酷的是,Subject 也可以是一个操作符(通过发布者的 .subscribe
方法),因此它可以存在于其他东西的下游,同时保持这种行为。唯一的技巧是您必须保留主题(您可以这样做,因为 .subscribe
调用会产生 AnyCancellable)。所以现在我可以你的方式,即从计时器发布者开始:
import UIKit
import Combine
func delay(_ delay:Double, closure:@escaping ()->()) {
let when = DispatchTime.now() + delay
DispatchQueue.main.asyncAfter(deadline: when, execute: closure)
}
class ViewController: UIViewController {
var storage = Set<AnyCancellable>()
override func viewDidLoad() {
super.viewDidLoad()
let counter = Timer.publish(every: 1, on: .main, in: .common)
.autoconnect()
.scan(0, { v, _ in v + 1 })
let sub = CurrentValueSubject<Int,Never>(0)
counter.subscribe(sub).store(in: &self.storage)
delay(0.5) {
sub.sink { print("A:", [=12=]) }.store(in: &self.storage)
}
delay(2.5) {
sub.sink { print("B:", [=12=]) }.store(in: &self.storage)
}
}
}
我正在尝试构建一个发布者,当后续订阅者连接到它时,它会立即向该订阅者发出最后发出的值,然后继续向所有连接的订阅者发出未来的值。
问:有没有办法做到这一点无需从头编写发布器,而只需组合内置发布器?我可能在这里遗漏了一些东西。
所以,如果我们有一个发布商每秒都在计数:
let counter = Timer.publish(every: 1, on: .main, in: .common)
.autoconnect()
.scan(0, { v, _ in v + 1 })
let sharedPublisher = // ??? something with counter publisher above
sharedPublisher.sink { print("A: ", [=10=] }.store(in: &bag)
// after 2.5 seconds
sharedPublisher.sink { print("B: ", [=10=] }.store(in: &bag)
输出将是:
A: 1 // at t=1 sec
A: 2 // at t=2
B: 2 // at t=2.5
A: 3 // at t=3
B: 3 // at t=3
最初,我天真地以为我可以只使用 .share
和 .buffer
:
let sharedPublisher = counter
.share()
.buffer(size: 1, prefetch: .byRequest, whenFull: .dropOldest)
但当然,这不起作用,因为 buffer
仅在下游尚未准备好接受值时才进行缓冲,而此处并非如此。
也许 Record
/Record.Recording
?
可以共享的发布者(因为它是 class,而不是结构)并立即将其最新值交给任何新订阅者,其最新值是 CurrentValueSubject。所以,例如,这不是你在做什么,但它证明了这一点:
import UIKit
import Combine
func delay(_ delay:Double, closure:@escaping ()->()) {
let when = DispatchTime.now() + delay
DispatchQueue.main.asyncAfter(deadline: when, execute: closure)
}
class ViewController: UIViewController {
var timer : Timer?
let sub = CurrentValueSubject<Int,Never>(0)
var storage = Set<AnyCancellable>()
override func viewDidLoad() {
super.viewDidLoad()
self.timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true) { _ in
self.sub.value += 1
}
delay(0.5) {
self.sub.sink { print("A:", [=10=]) }.store(in: &self.storage)
}
delay(2.5) {
self.sub.sink { print("B:", [=10=]) }.store(in: &self.storage)
}
}
}
输出
A: 0
A: 1
A: 2
B: 2
A: 3
B: 3
这似乎正是您想要的效果。
很酷的是,Subject 也可以是一个操作符(通过发布者的 .subscribe
方法),因此它可以存在于其他东西的下游,同时保持这种行为。唯一的技巧是您必须保留主题(您可以这样做,因为 .subscribe
调用会产生 AnyCancellable)。所以现在我可以你的方式,即从计时器发布者开始:
import UIKit
import Combine
func delay(_ delay:Double, closure:@escaping ()->()) {
let when = DispatchTime.now() + delay
DispatchQueue.main.asyncAfter(deadline: when, execute: closure)
}
class ViewController: UIViewController {
var storage = Set<AnyCancellable>()
override func viewDidLoad() {
super.viewDidLoad()
let counter = Timer.publish(every: 1, on: .main, in: .common)
.autoconnect()
.scan(0, { v, _ in v + 1 })
let sub = CurrentValueSubject<Int,Never>(0)
counter.subscribe(sub).store(in: &self.storage)
delay(0.5) {
sub.sink { print("A:", [=12=]) }.store(in: &self.storage)
}
delay(2.5) {
sub.sink { print("B:", [=12=]) }.store(in: &self.storage)
}
}
}