如何:使用 Combine 对后台的 CoreData 更改做出反应

How to: Using Combine to react to CoreData changes in the background

我想实现以下目标:每当有人触发 CoreData 保存(即发送 NSManagedObjectContextDidSave 通知)时,我想执行一些 background 计算基于更改后的 NSManagedObject。具体示例:假设在一个笔记应用中,我想异步计算所有笔记中的总字数。

目前的问题在于 NSManagedObject 上下文明确绑定到线程,不鼓励您在此线程外使用 NSManagedObjects。

我在 SceneDelegate:

中设置了两个 NSManagedObjectContext
let context = (UIApplication.shared.delegate as! AppDelegate).persistentContainer.viewContext
let backgroundContext = (UIApplication.shared.delegate as! AppDelegate).persistentContainer.newBackgroundContext()

我也通过 NotificationCenter.default.publisher(for: .NSManagedObjectContextDidSave) 订阅了通知,并且在我仅触发 一次 后收到保存通知 两次 managedObjectContext.save()。但是,这两个通知都是从同一个线程(即 UIThread)发送的,并且用户字典中的所有 NSManagedObjects 都有一个 .managedObjectContext,它是 viewContext 而不是 backgroundContext .

我的想法是根据关联的 NSManagedObjectContext 是否是后台通知来过滤通知,因为我假设通知也在(私有)DispatchQueue 上发送,但似乎所有通知都已发送在 UIThread 上并且从未使用过后台上下文。

知道如何解决这个问题吗?这是一个错误吗?如何根据 backgroundContext 检索通知,下游任务在关联的 DispatchQueue 上 运行?

你可以把你要观察的对象传给publisher(for:):

NotificationCenter.default
  .publisher(for: .NSManagedObjectContextDidSave, object: backgroundMoc)
  .sink(receiveValue: { notification in
    // handle changes
  })

这将只监听与后台管理对象上下文相关的通知,这意味着您可以安全地对该上下文的队列进行处理。

您可以创建一个发布器,当 Core Data 中与您相关的内容发生变化时,它会通知您。

我写了一篇关于这个的文章。 Combine, Publishers and Core Data.

import Combine
import CoreData
import Foundation

class CDPublisher<Entity>: NSObject, NSFetchedResultsControllerDelegate, Publisher where Entity: NSManagedObject {
    typealias Output = [Entity]
    typealias Failure = Error

    private let request: NSFetchRequest<Entity>
    private let context: NSManagedObjectContext
    private let subject: CurrentValueSubject<[Entity], Failure>
    private var resultController: NSFetchedResultsController<NSManagedObject>?
    private var subscriptions = 0

      init(request: NSFetchRequest<Entity>, context: NSManagedObjectContext) {
        if request.sortDescriptors == nil { request.sortDescriptors = [] }
        self.request = request
        self.context = context
        subject = CurrentValueSubject([])
        super.init()
    }

      func receive<S>(subscriber: S)
        where S: Subscriber, CDPublisher.Failure == S.Failure, CDPublisher.Output == S.Input {
        var start = false

        synchronized(self) {
            subscriptions += 1
            start = subscriptions == 1
        }

        if start {
            let controller = NSFetchedResultsController(fetchRequest: request, managedObjectContext: context, 
                                                        sectionNameKeyPath: nil, cacheName: nil)
            controller.delegate = self

            do {
                try controller.performFetch()
                let result = controller.fetchedObjects ?? []
                subject.send(result)
            } catch {
                subject.send(completion: .failure(error))
            }
            resultController = controller as? NSFetchedResultsController<NSManagedObject>
        }
        CDSubscription(fetchPublisher: self, subscriber: AnySubscriber(subscriber))
    }

      func controllerDidChangeContent(_ controller: NSFetchedResultsController<NSFetchRequestResult>) {
        let result = controller.fetchedObjects as? [Entity] ?? []
        subject.send(result)
    }

      private func dropSubscription() {
        objc_sync_enter(self)
        subscriptions -= 1
        let stop = subscriptions == 0
        objc_sync_exit(self)

        if stop {
            resultController?.delegate = nil
            resultController = nil
        }
    }

    private class CDSubscription: Subscription {
        private var fetchPublisher: CDPublisher?
        private var cancellable: AnyCancellable?

        @discardableResult
        init(fetchPublisher: CDPublisher, subscriber: AnySubscriber<Output, Failure>) {
            self.fetchPublisher = fetchPublisher

            subscriber.receive(subscription: self)

            cancellable = fetchPublisher.subject.sink(receiveCompletion: { completion in
                subscriber.receive(completion: completion)
            }, receiveValue: { value in
                _ = subscriber.receive(value)
            })
        }

        func request(_ demand: Subscribers.Demand) {}

        func cancel() {
            cancellable?.cancel()
            cancellable = nil
            fetchPublisher?.dropSubscription()
            fetchPublisher = nil
        }
    }

}

如果您不保存上下文两次,那么您必须添加两次观察者。