订阅返回的发布者后如何触发流程?
How can I trigger a process after a returned publisher would be subscribed?
我有一个 return 发布者的功能。该发布者给出了后台进程的结果。我只想在订阅发布者时触发后台进程,这样就不会丢失任何结果。后台进程可以多次更新其结果,因此带有 Future
的变体不适合。
private let passthroughSubject = PassthroughSubject<Data, Error>()
// This function will be used outside.
func fetchResults() -> AnyPublisher<Data, Error> {
return passthroughSubject
.eraseToAnyPublisher()
.somehowTriggerTheBackgroundProcess()
}
extension MyModule: MyDelegate {
func didUpdateResult(newResult: Data) {
self.passthroughSubject.send(newResult)
}
}
我尝试了什么?
未来:
Future<Data, Error> { [weak self] promise in
self?.passthroughSubject
.sink(receiveCompletion: { completion in
// My logic
}, receiveValue: { value in
// My logic
})
.store(in: &self.cancellableSet)
self?.triggerBackgroundProcess()
}.eraseToAnyPublisher()
按我想要的方式工作,但订阅者只被调用一次(合乎逻辑)。
延期:
Deferred<AnyPublisher<Data, Error>>(createPublisher: { [weak self] in
defer {
self?.triggerBackgroundProcess()
}
return passthroughSubject.eraseToAnyPublisher()
}
调试器显示一切正确:首先 return
然后 trigger
但订阅者不是第一次调用。
receiveSubscription
:
passthroughSubject
.handleEvents(receiveSubscription: { [weak self] subscription in
self?.triggerBackgroundProcess()
})
.eraseToAnyPublisher()
与 Deffered
相同的效果。
有没有可能实现我想要的?
或者,最好创建一个 public 发布者订阅它并从后台进程接收结果。 fetchResults()
函数没有 return 任何东西?
在此先感谢您的帮助。
在我看来,您的最后一段代码是一个完全可行的解决方案:在检测到订阅之前不要触发后台进程。示例:
let subject = PassthroughSubject<String, Never>()
var storage = Set<AnyCancellable>()
func start() {
self.subject
.handleEvents(receiveSubscription: {_ in
print("subscribed")
DispatchQueue.main.async {
self.doSomethingAsynchronous()
}
})
.sink { print("got", [=10=]) }
.store(in: &storage)
}
func doSomethingAsynchronous() {
DispatchQueue.global(qos: .userInitiated).async {
DispatchQueue.main.async {
self.subject.send("bingo")
}
}
}
您可以编写自己的符合Publisher
的类型并包装一个PassthroughSubject
。在您的实现中,您可以在获得订阅时启动后台进程。
public struct MyPublisher: Publisher {
public typealias Output = Data
public typealias Failure = Error
public func receive<Downstream: Subscriber>(subscriber: Downstream)
where Downstream.Input == Output, Downstream.Failure == Failure
{
let subject = PassthroughSubject<Output, Failure>()
subject.subscribe(subscriber)
startBackgroundProcess(subject: subject)
}
private func startBackgroundProcess(subject: PassthroughSubject<Output, Failure>) {
DispatchQueue.global(qos: .utility).async {
print("background process running")
subject.send(Data())
subject.send(completion: .finished)
}
}
}
请注意,此发布者为每个订阅者启动了一个新的后台进程。这是一个常见的实现。例如 URLSession.DataTaskPublisher
为每个订阅者发出一个新请求。如果想让多个订阅者共享一个请求的输出,可以使用.multicast
操作符,添加多个订阅者,然后.connect()
多播发布者启动后台进程一次:
let pub = MyPublisher().multicast { PassthroughSubject() }
pub.sink(...).store(in: &tickets) // first subscriber
pub.sink(...).store(in: &tickets) // second subscriber
pub.connect().store(in: &tickets) // start the background process
我有一个 return 发布者的功能。该发布者给出了后台进程的结果。我只想在订阅发布者时触发后台进程,这样就不会丢失任何结果。后台进程可以多次更新其结果,因此带有 Future
的变体不适合。
private let passthroughSubject = PassthroughSubject<Data, Error>()
// This function will be used outside.
func fetchResults() -> AnyPublisher<Data, Error> {
return passthroughSubject
.eraseToAnyPublisher()
.somehowTriggerTheBackgroundProcess()
}
extension MyModule: MyDelegate {
func didUpdateResult(newResult: Data) {
self.passthroughSubject.send(newResult)
}
}
我尝试了什么?
未来:
Future<Data, Error> { [weak self] promise in
self?.passthroughSubject
.sink(receiveCompletion: { completion in
// My logic
}, receiveValue: { value in
// My logic
})
.store(in: &self.cancellableSet)
self?.triggerBackgroundProcess()
}.eraseToAnyPublisher()
按我想要的方式工作,但订阅者只被调用一次(合乎逻辑)。
延期:
Deferred<AnyPublisher<Data, Error>>(createPublisher: { [weak self] in
defer {
self?.triggerBackgroundProcess()
}
return passthroughSubject.eraseToAnyPublisher()
}
调试器显示一切正确:首先 return
然后 trigger
但订阅者不是第一次调用。
receiveSubscription
:
passthroughSubject
.handleEvents(receiveSubscription: { [weak self] subscription in
self?.triggerBackgroundProcess()
})
.eraseToAnyPublisher()
与 Deffered
相同的效果。
有没有可能实现我想要的?
或者,最好创建一个 public 发布者订阅它并从后台进程接收结果。 fetchResults()
函数没有 return 任何东西?
在此先感谢您的帮助。
在我看来,您的最后一段代码是一个完全可行的解决方案:在检测到订阅之前不要触发后台进程。示例:
let subject = PassthroughSubject<String, Never>()
var storage = Set<AnyCancellable>()
func start() {
self.subject
.handleEvents(receiveSubscription: {_ in
print("subscribed")
DispatchQueue.main.async {
self.doSomethingAsynchronous()
}
})
.sink { print("got", [=10=]) }
.store(in: &storage)
}
func doSomethingAsynchronous() {
DispatchQueue.global(qos: .userInitiated).async {
DispatchQueue.main.async {
self.subject.send("bingo")
}
}
}
您可以编写自己的符合Publisher
的类型并包装一个PassthroughSubject
。在您的实现中,您可以在获得订阅时启动后台进程。
public struct MyPublisher: Publisher {
public typealias Output = Data
public typealias Failure = Error
public func receive<Downstream: Subscriber>(subscriber: Downstream)
where Downstream.Input == Output, Downstream.Failure == Failure
{
let subject = PassthroughSubject<Output, Failure>()
subject.subscribe(subscriber)
startBackgroundProcess(subject: subject)
}
private func startBackgroundProcess(subject: PassthroughSubject<Output, Failure>) {
DispatchQueue.global(qos: .utility).async {
print("background process running")
subject.send(Data())
subject.send(completion: .finished)
}
}
}
请注意,此发布者为每个订阅者启动了一个新的后台进程。这是一个常见的实现。例如 URLSession.DataTaskPublisher
为每个订阅者发出一个新请求。如果想让多个订阅者共享一个请求的输出,可以使用.multicast
操作符,添加多个订阅者,然后.connect()
多播发布者启动后台进程一次:
let pub = MyPublisher().multicast { PassthroughSubject() }
pub.sink(...).store(in: &tickets) // first subscriber
pub.sink(...).store(in: &tickets) // second subscriber
pub.connect().store(in: &tickets) // start the background process