使用 .receive(on: DispatchQueue.main) 时不接收输入
Not receiving inputs when using `.receive(on: DispatchQueue.main)`
我正在尝试使用 .receive(on: DispatchQueue.main)
更改为下游的主线程,但是当使用 .subscribe(:)
或 .sink(receiveValue:)
时我没有收到输入。如果我不更改线程,我会收到正确的输入。
出版商
extension URLSessionWebSocketTask {
struct ReceivePublisher: Publisher {
typealias Output = Message
typealias Failure = Error
let task: URLSessionWebSocketTask
func receive<S>(subscriber: S) where S: Subscriber, Output == S.Input, Failure == S.Failure {
task.receive { result in
switch result {
case .success(let message): _ = subscriber.receive(message)
case .failure(let error): subscriber.receive(completion: .failure(error))
}
}
}
}
}
extension URLSessionWebSocketTask {
func receivePublisher() -> ReceivePublisher {
ReceivePublisher(task: self)
}
}
订阅者
extension ViewModel: Subscriber {
typealias Input = URLSessionWebSocketTask.Message
typealias Failure = Error
func receive(subscription: Subscription) {}
func receive(_ input: URLSessionWebSocketTask.Message) -> Subscribers.Demand {
// Handle input here.
// When using `.receive(on:)` this method is not called when should be.
return .unlimited
}
func receive(completion: Subscribers.Completion<Error>) {}
}
订阅
socketTask.receivePublisher()
.receive(on: DispatchQueue.main)
.subscribe(viewModel)
socketTask.resume()
subscribe<S>(_ subject: S) -> AnyCancellable
返回的AnyCancellable
将在取消初始化后调用cancel()
。因此,如果您不保存它,它将在调用块超出范围时被取消初始化。
在我从 WWDC 看到的视频和教程中,从未解决过如何使用它。我所看到的是人们正在转向 RxSwift 的 DisposeBag
解决方案。
更新 Beta 4:
Combine 现在在 AnyCancellable
上附带了一个名为:store(in:)
的方法,它几乎可以完成我的旧解决方案所做的工作。您可以将 AnyCancellable
存储在一组 AnyCancellable
:
中
var cancellables = Set<AnyCancellable>()
...
override func viewDidLoad() {
super.viewDidLoad()
...
socketTask.receivePublisher()
.receive(on: DispatchQueue.main)
.subscribe(viewModel)
.store(in: &cancellables)
}
这样,数组(和所有 AnyCancellable
s)将在包含 class 被取消初始化时被取消初始化。
过时:
如果您想要一个可以更好地使用的所有 Cancellable
的解决方案,您可以这样扩展 Cancellable
:
extension Cancellable {
func cancel(with cancellables: inout [AnyCancellable]) {
if let cancellable = self as? AnyCancellable {
cancellables.append(cancellable)
} else {
cancellables.append(AnyCancellable(self))
}
}
}
我正在尝试使用 .receive(on: DispatchQueue.main)
更改为下游的主线程,但是当使用 .subscribe(:)
或 .sink(receiveValue:)
时我没有收到输入。如果我不更改线程,我会收到正确的输入。
出版商
extension URLSessionWebSocketTask {
struct ReceivePublisher: Publisher {
typealias Output = Message
typealias Failure = Error
let task: URLSessionWebSocketTask
func receive<S>(subscriber: S) where S: Subscriber, Output == S.Input, Failure == S.Failure {
task.receive { result in
switch result {
case .success(let message): _ = subscriber.receive(message)
case .failure(let error): subscriber.receive(completion: .failure(error))
}
}
}
}
}
extension URLSessionWebSocketTask {
func receivePublisher() -> ReceivePublisher {
ReceivePublisher(task: self)
}
}
订阅者
extension ViewModel: Subscriber {
typealias Input = URLSessionWebSocketTask.Message
typealias Failure = Error
func receive(subscription: Subscription) {}
func receive(_ input: URLSessionWebSocketTask.Message) -> Subscribers.Demand {
// Handle input here.
// When using `.receive(on:)` this method is not called when should be.
return .unlimited
}
func receive(completion: Subscribers.Completion<Error>) {}
}
订阅
socketTask.receivePublisher()
.receive(on: DispatchQueue.main)
.subscribe(viewModel)
socketTask.resume()
subscribe<S>(_ subject: S) -> AnyCancellable
返回的AnyCancellable
将在取消初始化后调用cancel()
。因此,如果您不保存它,它将在调用块超出范围时被取消初始化。
在我从 WWDC 看到的视频和教程中,从未解决过如何使用它。我所看到的是人们正在转向 RxSwift 的 DisposeBag
解决方案。
更新 Beta 4:
Combine 现在在 AnyCancellable
上附带了一个名为:store(in:)
的方法,它几乎可以完成我的旧解决方案所做的工作。您可以将 AnyCancellable
存储在一组 AnyCancellable
:
var cancellables = Set<AnyCancellable>()
...
override func viewDidLoad() {
super.viewDidLoad()
...
socketTask.receivePublisher()
.receive(on: DispatchQueue.main)
.subscribe(viewModel)
.store(in: &cancellables)
}
这样,数组(和所有 AnyCancellable
s)将在包含 class 被取消初始化时被取消初始化。
过时:
如果您想要一个可以更好地使用的所有 Cancellable
的解决方案,您可以这样扩展 Cancellable
:
extension Cancellable {
func cancel(with cancellables: inout [AnyCancellable]) {
if let cancellable = self as? AnyCancellable {
cancellables.append(cancellable)
} else {
cancellables.append(AnyCancellable(self))
}
}
}