并行执行 Combine Publishers 会产生竞争条件

Executing Combine Publishers in parallel creates race condition

我正在尝试查询 HealthKit 以获取 HKWorkoutEvent 定义的时间间隔内的心率值和步数,以填充我定义为存储多个变量的自定义本地模型,定义如下。

struct SGWorkoutEvent: Identifiable {
    let id = UUID()
    let type: HKWorkoutEventType
    let splitActiveDurationQuantity: HKQuantity?
    let splitDistanceQuantity: HKQuantity?
    let totalDistanceQuantity: HKQuantity?
    let splitMeasuringSystem: HKUnit

    let steps: HKQuantity?
    let heartRate: HKQuantity?
}

除了 stepsheartRate 之外的 Al 属性可以从 HKWorkoutEvent 中提取。但是,我正在尝试构建一个 Combine 管道,它可以让我创建一个发布者数组来并行查询心率、步数并传递锻炼事件,因此在 sink 中我收到一个 3 元素元组这些值,以便我可以填充上面的模型。我目前拥有的如下,

// Extract the workout's segments (defined automatically by an Apple Watch)
let workoutSegments = (workout.workoutEvents ?? []).filter({ [=11=].type == .segment })

// For each of the workout segments defined above create a HKStatisticQuery that starts on the interval's
// beginning and ends on the interval's end so the HealthKit query is properly defined to be
// executed between that interval.
let segmentsWorkoutPublisher = Publishers.MergeMany(workoutSegments.map({ [=11=].dateInterval }).map({
    healthStore.statistic(for: HKQuantityType.quantityType(forIdentifier: HKQuantityTypeIdentifier.heartRate)!, with: .discreteAverage, from: [=11=].start, to: [=11=].end)
}))
.assertNoFailure()

// Do the same logic as above in `segmentsWorkoutPublisher` but for steps
let stepsPublisher = Publishers.MergeMany(workoutSegments.map({ [=11=].dateInterval }).map({

    healthStore.statistic(for: HKObjectType.quantityType(forIdentifier: HKQuantityTypeIdentifier.stepCount)!, with: .cumulativeSum, from: [=11=].start, to: [=11=].end)
}))
.assertNoFailure()

Publishers.Zip3(workoutSegments.publisher, stepsPublisher, segmentsWorkoutPublisher)
    
    .receive(on: DispatchQueue.main)
    .sink(receiveValue: { pace, steps, hrs in
        
        let d = SGWorkoutEvent(type: pace.type,
                               splitActiveDurationQuantity: pace.splitDuration,
                               splitDistanceQuantity: pace.splitDistance,
                               totalDistanceQuantity: pace.totalDistanceQuantity,
                               splitMeasuringSystem: pace.splitMeasuringSystem,
                               steps: steps.sumQuantity(),
                               heartRate: hrs.averageQuantity())
        
        self.paces.append(d)
    })
    .store(in: &bag)

HKHealthStore.statistic(for:...) 只不过是在 HKHealthStore 扩展上定义的 HKStatisticsQuery 的组合包装器,见下文。

public func statistic(for type: HKQuantityType, with options: HKStatisticsOptions, from startDate: Date, to endDate: Date, _ limit: Int = HKObjectQueryNoLimit) -> AnyPublisher<HKStatistics, Error> {
    
    let subject = PassthroughSubject<HKStatistics, Error>()
    
    let predicate = HKStatisticsQuery.predicateForSamples(withStart: startDate, end: endDate, options: [.strictEndDate, .strictStartDate])
    
    let query = HKStatisticsQuery(quantityType: type, quantitySamplePredicate: predicate, options: options, completionHandler: { (query, statistics, error) in
        
        guard error == nil else {
            hkCombineLogger.error("Error fetching statistics \(error!.localizedDescription)")
            return
        }
        
        subject.send(statistics!)
        subject.send(completion: .finished)
    })
    
    self.execute(query)
    
    return subject.eraseToAnyPublisher()
}

我在这里看到的是某种竞态条件,其中检索到的步数和心率没有同时返回广告。结果,我看到了一些没有意义的值,比如一个 1K 分割的 5' 200steps 和另一个相同持续时间 700steps 的分割。实际情况应该是这两个间隔应该显示 150 左右的值,但似乎我可能没有使用正确的 Combine 运算符。

我希望看到的预期行为是 Publishers.Zip 上的每个发布者让每个 3 项元组按顺序(第一个间隔,第二个间隔...)完成查询,而不是这个非-可复制的竞争条件。

为了尝试提供更多上下文,我认为这类似于针对不同的时间戳建立一个具有温度、湿度和下雨几率的模型,并查询三个不同的 API 端点以检索三个不同的值并将它们合并在模型中。

这里有很多东西要打开,但我会试一试。让我们从您的 HKHealthStore.statistic 函数开始。您想要 运行 一个(可能是异步的)查询,发布一个具有单个结果的序列,然后结束。这看起来确实是使用 Future 的理想情况。我对 HealthKit 没有任何经验(完全),我不能保证这会编译,但是转换 可能 看起来像这样:

public func statistic(
    for type: HKQuantityType,
    with options: HKStatisticsOptions,
    from startDate: Date,
    to endDate: Date,
    _ limit: Int = HKObjectQueryNoLimit) -> AnyPublisher<HKStatistics, Error> {

    let future = Future<HKStatistics, Error> {
        fulfillPromise in

        let predicate = HKStatisticsQuery.predicateForSamples(withStart: startDate, end: endDate, options: [.strictEndDate, .strictStartDate])

        let query = HKStatisticsQuery(quantityType: type, quantitySamplePredicate: predicate, options: options, completionHandler: { (query, statistics, error) in
            guard error == nil else {
                hkCombineLogger.error("Error fetching statistics \(error!.localizedDescription)")
                fulfillPromise(.failure(error!))
            }

            fulfillPromise(.success(statistics!))
        })

        self.execute(query)
    }

    return future.eraseToAnyPublisher()
}

所以现在我们有一个“一次性”发布者,它 运行 是一个查询并在它有值时触发。

现在让我们看看您的 segmentsWorkoutPublisher(以及扩展 stepsPublisher)。

在使用 Combine 时,如果您发现自己使用 Publisher.<SomeOperatorType> 构造函数,您应该非常小心。根据我的经验,这很少是正确的做法。 (话虽如此,以后使用 Zip3 对我来说似乎没问题)。

在这种情况下,您正在创建一个 Publishers 序列(您的 Futures)。但是你真的对 Publishers 的序列不感兴趣。您对 那些 Publishers 产生的序列感兴趣。从某种意义上说,您想要“解包”每个 Publisher(通过等待其值)并将这些结果发送到序列中。这正是 flatMap 的用途!让我们做这样的事情:

let segmentsWorkoutPublisher =
    workoutSegments
        .map { [=11=].dateInterval }
        .flatMap {
            healthStore.statistic(for: HKQuantityType.quantityType(forIdentifier: HKQuantityTypeIdentifier.heartRate)!, with: .discreteAverage, from: [=11=].start, to: [=11=].end)
        }
        .assertNoFailure()

这会生成一串序列,然后等待每个序列发出一个值并将值发送到更远的地方。

stepsPublisher也会以类似的方式改变。

我认为这会让您到达需要去的地方。作为研究的一部分,创建了一个游乐场,我在其中修改了您的示例,但使用了更简化的类型。下次你 运行 遇到这样的问题时,你可能会尝试类似的方法 - 过滤掉多余的细节并尝试创建一个更简单的示例。如果你能像这样在 playground 中将你的代码放在一起,那么编译就不会遇到太多麻烦,这将使回答问题变得更容易。游乐场:

import Foundation
import Combine
import PlaygroundSupport

enum MockEventType : CaseIterable {
    case segment
    case notSegment
}

struct MockSegment {
    let type : MockEventType
    let dateInterval : DateInterval = DateInterval.init(start: Date.now, duration: 3600)
}

func statistic() -> AnyPublisher<Float, Never> {
    let future = Future<Float, Never>() {
        fulfillPromise in

        DispatchQueue.global(qos: .background).async {
            sleep(UInt32.random(in: 1...3))
            fulfillPromise(.success(Float.random(in: 100.0...150.0)))
        }
    }

    return future
        .eraseToAnyPublisher()
}

// Generate an endless stream of mock events.
let rawWorkouts = Timer.publish(every: 1.0, on: .current, in: .common)
    .autoconnect()
    .map{ _ in MockSegment(type: MockEventType.allCases.randomElement()!) }

let workoutSegments = rawWorkouts.filter { [=12=].type == .segment }

let dateIntervals =
    workoutSegments
        .map { [=12=].dateInterval }

let segmentsWorkoutPublisher =
        dateIntervals
        .flatMap { _ in statistic() }
        .assertNoFailure()

let stepsPublisher =
        dateIntervals
        .flatMap { _ in statistic() }
        .assertNoFailure()

var bag = Set<AnyCancellable>()

Publishers.Zip3(workoutSegments, stepsPublisher, segmentsWorkoutPublisher)
    .receive(on: DispatchQueue.main)
    .sink(receiveValue: { pace, steps, hrs in
        print("pace: \(pace) steps: \(steps), hrs: \(hrs)")
    })
    .store(in: &bag)

PlaygroundSupport.PlaygroundPage.current.needsIndefiniteExecution = true