使用 Combine,如何在网络请求后取消分配订阅

With Combine, how to deallocate the Subscription after a network request

如果您将 Combine 用于 URLSession 的网络请求,那么您需要保存 Subscription(又名 AnyCancellable)——否则它会立即被释放,从而取消网络请求。稍后,当处理完网络响应后,您想要取消分配订阅,因为保留它会浪费内存。

下面是执行此操作的一些代码。这有点尴尬,甚至可能不正确。我可以想象一个竞争条件,其中网络请求可以在 sub 设置为非零值之前在另一个线程上启动和完成。

有更好的方法吗?

class SomeThing {
    var subs = Set<AnyCancellable>()
    func sendNetworkRequest() {
        var request: URLRequest = ...
        var sub: AnyCancellable? = nil            
        sub = URLSession.shared.dataTaskPublisher(for: request)
            .map(\.data)
            .decode(type: MyResponse.self, decoder: JSONDecoder())
            .sink(
                receiveCompletion: { completion in                
                    self.subs.remove(sub!)
                }, 
                receiveValue: { response in ... }
            }
        subs.insert(sub!)

Below is some code that does this. It's kind of awkward, and it may not even be correct. I can imagine a race condition where network request could start and complete on another thread before sub is set to the non-nil value.

危险! Swift.Set 不是线程安全的。如果你想从两个不同的线程访问一个 Set,你需要序列化访问,这样它们就不会重叠。

一般情况下(尽管 URLSession.DataTaskPublisher 可能不可行)是发布者同步发出信号,甚至在 sink 运算符 returns 之前。这就是 JustResult.PublisherPublishers.Sequence 和其他人的行为方式。因此,这些会产生您所描述的问题,而不涉及线程安全。

现在,如何解决这个问题?如果您认为您实际上不想取消订阅,那么您可以使用 Subscribers.Sink 而不是 sink 运算符来避免创建 AnyCancellable

        URLSession.shared.dataTaskPublisher(for: request)
            .map(\.data)
            .decode(type: MyResponse.self, decoder: JSONDecoder())
            .subscribe(Subscribers.Sink(
                receiveCompletion: { completion in ... },
                receiveValue: { response in ... }
            ))

Combine 将在订阅完成后清理订阅和订阅者(​​使用 .finished.failure)。

但是如果您想要取消订阅怎么办?也许有时您的 SomeThing 在订阅完成之前就被销毁了,在这种情况下您不需要完成订阅。然后你确实想创建一个 AnyCancellable 并将它存储在一个实例 属性 中,以便在 SomeThing 被销毁时它被取消。

在那种情况下,设置一个标志,表明sink赢得了比赛,并在存储AnyCancellable之前检查标志。

        var sub: AnyCancellable? = nil
        var isComplete = false
        sub = URLSession.shared.dataTaskPublisher(for: request)
            .map(\.data)
            .decode(type: MyResponse.self, decoder: JSONDecoder())
            // This ensures thread safety, if the subscription is also created
            // on DispatchQueue.main.
            .receive(on: DispatchQueue.main)
            .sink(
                receiveCompletion: { [weak self] completion in
                    isComplete = true
                    if let theSub = sub {
                        self?.subs.remove(theSub)
                    }
                }, 
                receiveValue: { response in ... }
            }
        if !isComplete {
            subs.insert(sub!)
        }

我称这种情况为一次性订户。这个想法是,因为数据任务发布者只发布一次,所以您知道在收到单个值 and/or 完成(错误)后销毁管道是安全的。

这是我喜欢使用的技巧。首先,这是管道的头部:

let url = URL(string:"https://www.apeth.com/pep/manny.jpg")!
let pub : AnyPublisher<UIImage?,Never> =
    URLSession.shared.dataTaskPublisher(for: url)
        .map {[=10=].data}
        .replaceError(with: Data())
        .compactMap { UIImage(data:[=10=]) }
        .receive(on: DispatchQueue.main)
        .eraseToAnyPublisher()

有趣的部分来了。仔细观察:

var cancellable: AnyCancellable? // 1
cancellable = pub.sink(receiveCompletion: {_ in // 2
    cancellable?.cancel() // 3
}) { image in
    self.imageView.image = image
}

你看到我在那里做了什么吗?也许不是,所以我会解释一下:

  1. 首先,我声明一个local AnyCancellable 变量;由于与 Swift 语法规则有关的原因,这需要是可选的。

  2. 然后,我创建我的订阅者并将我的 AnyCancellable 变量设置为该订阅者。同样,出于与 Swift 语法规则有关的原因,我的订阅者需要成为接收器。

  3. 最后,在订阅者本身,我在收到完成时取消了 AnyCancellable。

第三步中的取消实际上做了 两个 事情,除了调用 cancel() — 与内存管理有关的事情:

  • 通过referringcancellable在Sink的异步完成函数里面,我保持cancellable和整个pipelinealive 足以让值从订阅者到达。

  • 通过取消cancellable,我允许管道不存在并防止会导致周围视图控制器的保留循环泄漏.

合并发布者有一个名为 prefix 的实例方法,它执行以下操作:

func prefix(_ maxLength: Int) -> Publishers.Output<Self>

https://developer.apple.com/documentation/combine/publisher/prefix(_:)

playground example