使用 .receive(on: DispatchQueue.main) 时不接收输入

Not receiving inputs when using `.receive(on: DispatchQueue.main)`

我正在尝试使用 .receive(on: DispatchQueue.main) 更改为下游的主线程,但是当使用 .subscribe(:).sink(receiveValue:) 时我没有收到输入。如果我不更改线程,我会收到正确的输入。

出版商

extension URLSessionWebSocketTask {
  struct ReceivePublisher: Publisher {
    typealias Output = Message
    typealias Failure = Error

    let task: URLSessionWebSocketTask

    func receive<S>(subscriber: S) where S: Subscriber, Output == S.Input, Failure == S.Failure {
      task.receive { result in
        switch result {
        case .success(let message): _ = subscriber.receive(message)
        case .failure(let error): subscriber.receive(completion: .failure(error))
        }
      }
    }
  }
}

extension URLSessionWebSocketTask {
  func receivePublisher() -> ReceivePublisher {
    ReceivePublisher(task: self)
  }
}

订阅者

extension ViewModel: Subscriber {
  typealias Input = URLSessionWebSocketTask.Message
  typealias Failure = Error

  func receive(subscription: Subscription) {}

  func receive(_ input: URLSessionWebSocketTask.Message) -> Subscribers.Demand {
    // Handle input here.
    // When using `.receive(on:)` this method is not called when should be.
    return .unlimited
  }

  func receive(completion: Subscribers.Completion<Error>) {}
}

订阅

socketTask.receivePublisher()
      .receive(on: DispatchQueue.main)
      .subscribe(viewModel)
socketTask.resume()

subscribe<S>(_ subject: S) -> AnyCancellable 返回的AnyCancellable 将在取消初始化后调用cancel()。因此,如果您不保存它,它将在调用块超出范围时被取消初始化。

在我从 WWDC 看到的视频和教程中,从未解决过如何使用它。我所看到的是人们正在转向 RxSwift 的 DisposeBag 解决方案。

更新 Beta 4: Combine 现在在 AnyCancellable 上附带了一个名为:store(in:) 的方法,它几乎可以完成我的旧解决方案所做的工作。您可以将 AnyCancellable 存储在一组 AnyCancellable:

var cancellables = Set<AnyCancellable>()
...
override func viewDidLoad() {
    super.viewDidLoad()
    ...
    socketTask.receivePublisher()
        .receive(on: DispatchQueue.main)
        .subscribe(viewModel)
        .store(in: &cancellables)
}

这样,数组(和所有 AnyCancellables)将在包含 class 被取消初始化时被取消初始化。

过时:

如果您想要一个可以更好地使用的所有 Cancellable 的解决方案,您可以这样扩展 Cancellable

extension Cancellable {

    func cancel(with cancellables: inout [AnyCancellable]) {
        if let cancellable = self as? AnyCancellable {
            cancellables.append(cancellable)
        } else {
            cancellables.append(AnyCancellable(self))
        }
    }

}