并行执行 Combine Publishers 会产生竞争条件
Executing Combine Publishers in parallel creates race condition
我正在尝试查询 HealthKit 以获取 HKWorkoutEvent
定义的时间间隔内的心率值和步数,以填充我定义为存储多个变量的自定义本地模型,定义如下。
struct SGWorkoutEvent: Identifiable {
let id = UUID()
let type: HKWorkoutEventType
let splitActiveDurationQuantity: HKQuantity?
let splitDistanceQuantity: HKQuantity?
let totalDistanceQuantity: HKQuantity?
let splitMeasuringSystem: HKUnit
let steps: HKQuantity?
let heartRate: HKQuantity?
}
除了 steps
和 heartRate
之外的 Al 属性可以从 HKWorkoutEvent
中提取。但是,我正在尝试构建一个 Combine 管道,它可以让我创建一个发布者数组来并行查询心率、步数并传递锻炼事件,因此在 sink
中我收到一个 3 元素元组这些值,以便我可以填充上面的模型。我目前拥有的如下,
// Extract the workout's segments (defined automatically by an Apple Watch)
let workoutSegments = (workout.workoutEvents ?? []).filter({ [=11=].type == .segment })
// For each of the workout segments defined above create a HKStatisticQuery that starts on the interval's
// beginning and ends on the interval's end so the HealthKit query is properly defined to be
// executed between that interval.
let segmentsWorkoutPublisher = Publishers.MergeMany(workoutSegments.map({ [=11=].dateInterval }).map({
healthStore.statistic(for: HKQuantityType.quantityType(forIdentifier: HKQuantityTypeIdentifier.heartRate)!, with: .discreteAverage, from: [=11=].start, to: [=11=].end)
}))
.assertNoFailure()
// Do the same logic as above in `segmentsWorkoutPublisher` but for steps
let stepsPublisher = Publishers.MergeMany(workoutSegments.map({ [=11=].dateInterval }).map({
healthStore.statistic(for: HKObjectType.quantityType(forIdentifier: HKQuantityTypeIdentifier.stepCount)!, with: .cumulativeSum, from: [=11=].start, to: [=11=].end)
}))
.assertNoFailure()
Publishers.Zip3(workoutSegments.publisher, stepsPublisher, segmentsWorkoutPublisher)
.receive(on: DispatchQueue.main)
.sink(receiveValue: { pace, steps, hrs in
let d = SGWorkoutEvent(type: pace.type,
splitActiveDurationQuantity: pace.splitDuration,
splitDistanceQuantity: pace.splitDistance,
totalDistanceQuantity: pace.totalDistanceQuantity,
splitMeasuringSystem: pace.splitMeasuringSystem,
steps: steps.sumQuantity(),
heartRate: hrs.averageQuantity())
self.paces.append(d)
})
.store(in: &bag)
HKHealthStore.statistic(for:...)
只不过是在 HKHealthStore
扩展上定义的 HKStatisticsQuery
的组合包装器,见下文。
public func statistic(for type: HKQuantityType, with options: HKStatisticsOptions, from startDate: Date, to endDate: Date, _ limit: Int = HKObjectQueryNoLimit) -> AnyPublisher<HKStatistics, Error> {
let subject = PassthroughSubject<HKStatistics, Error>()
let predicate = HKStatisticsQuery.predicateForSamples(withStart: startDate, end: endDate, options: [.strictEndDate, .strictStartDate])
let query = HKStatisticsQuery(quantityType: type, quantitySamplePredicate: predicate, options: options, completionHandler: { (query, statistics, error) in
guard error == nil else {
hkCombineLogger.error("Error fetching statistics \(error!.localizedDescription)")
return
}
subject.send(statistics!)
subject.send(completion: .finished)
})
self.execute(query)
return subject.eraseToAnyPublisher()
}
我在这里看到的是某种竞态条件,其中检索到的步数和心率没有同时返回广告。结果,我看到了一些没有意义的值,比如一个 1K 分割的 5' 200steps 和另一个相同持续时间 700steps 的分割。实际情况应该是这两个间隔应该显示 150 左右的值,但似乎我可能没有使用正确的 Combine 运算符。
我希望看到的预期行为是 Publishers.Zip
上的每个发布者让每个 3 项元组按顺序(第一个间隔,第二个间隔...)完成查询,而不是这个非-可复制的竞争条件。
为了尝试提供更多上下文,我认为这类似于针对不同的时间戳建立一个具有温度、湿度和下雨几率的模型,并查询三个不同的 API 端点以检索三个不同的值并将它们合并在模型中。
这里有很多东西要打开,但我会试一试。让我们从您的 HKHealthStore.statistic
函数开始。您想要 运行 一个(可能是异步的)查询,发布一个具有单个结果的序列,然后结束。这看起来确实是使用 Future
的理想情况。我对 HealthKit
没有任何经验(完全),我不能保证这会编译,但是转换 可能 看起来像这样:
public func statistic(
for type: HKQuantityType,
with options: HKStatisticsOptions,
from startDate: Date,
to endDate: Date,
_ limit: Int = HKObjectQueryNoLimit) -> AnyPublisher<HKStatistics, Error> {
let future = Future<HKStatistics, Error> {
fulfillPromise in
let predicate = HKStatisticsQuery.predicateForSamples(withStart: startDate, end: endDate, options: [.strictEndDate, .strictStartDate])
let query = HKStatisticsQuery(quantityType: type, quantitySamplePredicate: predicate, options: options, completionHandler: { (query, statistics, error) in
guard error == nil else {
hkCombineLogger.error("Error fetching statistics \(error!.localizedDescription)")
fulfillPromise(.failure(error!))
}
fulfillPromise(.success(statistics!))
})
self.execute(query)
}
return future.eraseToAnyPublisher()
}
所以现在我们有一个“一次性”发布者,它 运行 是一个查询并在它有值时触发。
现在让我们看看您的 segmentsWorkoutPublisher
(以及扩展 stepsPublisher
)。
在使用 Combine 时,如果您发现自己使用 Publisher.<SomeOperatorType>
构造函数,您应该非常小心。根据我的经验,这很少是正确的做法。 (话虽如此,以后使用 Zip3
对我来说似乎没问题)。
在这种情况下,您正在创建一个 Publishers
序列(您的 Futures
)。但是你真的对 Publishers
的序列不感兴趣。您对 值 那些 Publishers
产生的序列感兴趣。从某种意义上说,您想要“解包”每个 Publisher(通过等待其值)并将这些结果发送到序列中。这正是 flatMap
的用途!让我们做这样的事情:
let segmentsWorkoutPublisher =
workoutSegments
.map { [=11=].dateInterval }
.flatMap {
healthStore.statistic(for: HKQuantityType.quantityType(forIdentifier: HKQuantityTypeIdentifier.heartRate)!, with: .discreteAverage, from: [=11=].start, to: [=11=].end)
}
.assertNoFailure()
这会生成一串序列,然后等待每个序列发出一个值并将值发送到更远的地方。
而stepsPublisher
也会以类似的方式改变。
我认为这会让您到达需要去的地方。作为研究的一部分,创建了一个游乐场,我在其中修改了您的示例,但使用了更简化的类型。下次你 运行 遇到这样的问题时,你可能会尝试类似的方法 - 过滤掉多余的细节并尝试创建一个更简单的示例。如果你能像这样在 playground 中将你的代码放在一起,那么编译就不会遇到太多麻烦,这将使回答问题变得更容易。游乐场:
import Foundation
import Combine
import PlaygroundSupport
enum MockEventType : CaseIterable {
case segment
case notSegment
}
struct MockSegment {
let type : MockEventType
let dateInterval : DateInterval = DateInterval.init(start: Date.now, duration: 3600)
}
func statistic() -> AnyPublisher<Float, Never> {
let future = Future<Float, Never>() {
fulfillPromise in
DispatchQueue.global(qos: .background).async {
sleep(UInt32.random(in: 1...3))
fulfillPromise(.success(Float.random(in: 100.0...150.0)))
}
}
return future
.eraseToAnyPublisher()
}
// Generate an endless stream of mock events.
let rawWorkouts = Timer.publish(every: 1.0, on: .current, in: .common)
.autoconnect()
.map{ _ in MockSegment(type: MockEventType.allCases.randomElement()!) }
let workoutSegments = rawWorkouts.filter { [=12=].type == .segment }
let dateIntervals =
workoutSegments
.map { [=12=].dateInterval }
let segmentsWorkoutPublisher =
dateIntervals
.flatMap { _ in statistic() }
.assertNoFailure()
let stepsPublisher =
dateIntervals
.flatMap { _ in statistic() }
.assertNoFailure()
var bag = Set<AnyCancellable>()
Publishers.Zip3(workoutSegments, stepsPublisher, segmentsWorkoutPublisher)
.receive(on: DispatchQueue.main)
.sink(receiveValue: { pace, steps, hrs in
print("pace: \(pace) steps: \(steps), hrs: \(hrs)")
})
.store(in: &bag)
PlaygroundSupport.PlaygroundPage.current.needsIndefiniteExecution = true
我正在尝试查询 HealthKit 以获取 HKWorkoutEvent
定义的时间间隔内的心率值和步数,以填充我定义为存储多个变量的自定义本地模型,定义如下。
struct SGWorkoutEvent: Identifiable {
let id = UUID()
let type: HKWorkoutEventType
let splitActiveDurationQuantity: HKQuantity?
let splitDistanceQuantity: HKQuantity?
let totalDistanceQuantity: HKQuantity?
let splitMeasuringSystem: HKUnit
let steps: HKQuantity?
let heartRate: HKQuantity?
}
除了 steps
和 heartRate
之外的 Al 属性可以从 HKWorkoutEvent
中提取。但是,我正在尝试构建一个 Combine 管道,它可以让我创建一个发布者数组来并行查询心率、步数并传递锻炼事件,因此在 sink
中我收到一个 3 元素元组这些值,以便我可以填充上面的模型。我目前拥有的如下,
// Extract the workout's segments (defined automatically by an Apple Watch)
let workoutSegments = (workout.workoutEvents ?? []).filter({ [=11=].type == .segment })
// For each of the workout segments defined above create a HKStatisticQuery that starts on the interval's
// beginning and ends on the interval's end so the HealthKit query is properly defined to be
// executed between that interval.
let segmentsWorkoutPublisher = Publishers.MergeMany(workoutSegments.map({ [=11=].dateInterval }).map({
healthStore.statistic(for: HKQuantityType.quantityType(forIdentifier: HKQuantityTypeIdentifier.heartRate)!, with: .discreteAverage, from: [=11=].start, to: [=11=].end)
}))
.assertNoFailure()
// Do the same logic as above in `segmentsWorkoutPublisher` but for steps
let stepsPublisher = Publishers.MergeMany(workoutSegments.map({ [=11=].dateInterval }).map({
healthStore.statistic(for: HKObjectType.quantityType(forIdentifier: HKQuantityTypeIdentifier.stepCount)!, with: .cumulativeSum, from: [=11=].start, to: [=11=].end)
}))
.assertNoFailure()
Publishers.Zip3(workoutSegments.publisher, stepsPublisher, segmentsWorkoutPublisher)
.receive(on: DispatchQueue.main)
.sink(receiveValue: { pace, steps, hrs in
let d = SGWorkoutEvent(type: pace.type,
splitActiveDurationQuantity: pace.splitDuration,
splitDistanceQuantity: pace.splitDistance,
totalDistanceQuantity: pace.totalDistanceQuantity,
splitMeasuringSystem: pace.splitMeasuringSystem,
steps: steps.sumQuantity(),
heartRate: hrs.averageQuantity())
self.paces.append(d)
})
.store(in: &bag)
HKHealthStore.statistic(for:...)
只不过是在 HKHealthStore
扩展上定义的 HKStatisticsQuery
的组合包装器,见下文。
public func statistic(for type: HKQuantityType, with options: HKStatisticsOptions, from startDate: Date, to endDate: Date, _ limit: Int = HKObjectQueryNoLimit) -> AnyPublisher<HKStatistics, Error> {
let subject = PassthroughSubject<HKStatistics, Error>()
let predicate = HKStatisticsQuery.predicateForSamples(withStart: startDate, end: endDate, options: [.strictEndDate, .strictStartDate])
let query = HKStatisticsQuery(quantityType: type, quantitySamplePredicate: predicate, options: options, completionHandler: { (query, statistics, error) in
guard error == nil else {
hkCombineLogger.error("Error fetching statistics \(error!.localizedDescription)")
return
}
subject.send(statistics!)
subject.send(completion: .finished)
})
self.execute(query)
return subject.eraseToAnyPublisher()
}
我在这里看到的是某种竞态条件,其中检索到的步数和心率没有同时返回广告。结果,我看到了一些没有意义的值,比如一个 1K 分割的 5' 200steps 和另一个相同持续时间 700steps 的分割。实际情况应该是这两个间隔应该显示 150 左右的值,但似乎我可能没有使用正确的 Combine 运算符。
我希望看到的预期行为是 Publishers.Zip
上的每个发布者让每个 3 项元组按顺序(第一个间隔,第二个间隔...)完成查询,而不是这个非-可复制的竞争条件。
为了尝试提供更多上下文,我认为这类似于针对不同的时间戳建立一个具有温度、湿度和下雨几率的模型,并查询三个不同的 API 端点以检索三个不同的值并将它们合并在模型中。
这里有很多东西要打开,但我会试一试。让我们从您的 HKHealthStore.statistic
函数开始。您想要 运行 一个(可能是异步的)查询,发布一个具有单个结果的序列,然后结束。这看起来确实是使用 Future
的理想情况。我对 HealthKit
没有任何经验(完全),我不能保证这会编译,但是转换 可能 看起来像这样:
public func statistic(
for type: HKQuantityType,
with options: HKStatisticsOptions,
from startDate: Date,
to endDate: Date,
_ limit: Int = HKObjectQueryNoLimit) -> AnyPublisher<HKStatistics, Error> {
let future = Future<HKStatistics, Error> {
fulfillPromise in
let predicate = HKStatisticsQuery.predicateForSamples(withStart: startDate, end: endDate, options: [.strictEndDate, .strictStartDate])
let query = HKStatisticsQuery(quantityType: type, quantitySamplePredicate: predicate, options: options, completionHandler: { (query, statistics, error) in
guard error == nil else {
hkCombineLogger.error("Error fetching statistics \(error!.localizedDescription)")
fulfillPromise(.failure(error!))
}
fulfillPromise(.success(statistics!))
})
self.execute(query)
}
return future.eraseToAnyPublisher()
}
所以现在我们有一个“一次性”发布者,它 运行 是一个查询并在它有值时触发。
现在让我们看看您的 segmentsWorkoutPublisher
(以及扩展 stepsPublisher
)。
在使用 Combine 时,如果您发现自己使用 Publisher.<SomeOperatorType>
构造函数,您应该非常小心。根据我的经验,这很少是正确的做法。 (话虽如此,以后使用 Zip3
对我来说似乎没问题)。
在这种情况下,您正在创建一个 Publishers
序列(您的 Futures
)。但是你真的对 Publishers
的序列不感兴趣。您对 值 那些 Publishers
产生的序列感兴趣。从某种意义上说,您想要“解包”每个 Publisher(通过等待其值)并将这些结果发送到序列中。这正是 flatMap
的用途!让我们做这样的事情:
let segmentsWorkoutPublisher =
workoutSegments
.map { [=11=].dateInterval }
.flatMap {
healthStore.statistic(for: HKQuantityType.quantityType(forIdentifier: HKQuantityTypeIdentifier.heartRate)!, with: .discreteAverage, from: [=11=].start, to: [=11=].end)
}
.assertNoFailure()
这会生成一串序列,然后等待每个序列发出一个值并将值发送到更远的地方。
而stepsPublisher
也会以类似的方式改变。
我认为这会让您到达需要去的地方。作为研究的一部分,创建了一个游乐场,我在其中修改了您的示例,但使用了更简化的类型。下次你 运行 遇到这样的问题时,你可能会尝试类似的方法 - 过滤掉多余的细节并尝试创建一个更简单的示例。如果你能像这样在 playground 中将你的代码放在一起,那么编译就不会遇到太多麻烦,这将使回答问题变得更容易。游乐场:
import Foundation
import Combine
import PlaygroundSupport
enum MockEventType : CaseIterable {
case segment
case notSegment
}
struct MockSegment {
let type : MockEventType
let dateInterval : DateInterval = DateInterval.init(start: Date.now, duration: 3600)
}
func statistic() -> AnyPublisher<Float, Never> {
let future = Future<Float, Never>() {
fulfillPromise in
DispatchQueue.global(qos: .background).async {
sleep(UInt32.random(in: 1...3))
fulfillPromise(.success(Float.random(in: 100.0...150.0)))
}
}
return future
.eraseToAnyPublisher()
}
// Generate an endless stream of mock events.
let rawWorkouts = Timer.publish(every: 1.0, on: .current, in: .common)
.autoconnect()
.map{ _ in MockSegment(type: MockEventType.allCases.randomElement()!) }
let workoutSegments = rawWorkouts.filter { [=12=].type == .segment }
let dateIntervals =
workoutSegments
.map { [=12=].dateInterval }
let segmentsWorkoutPublisher =
dateIntervals
.flatMap { _ in statistic() }
.assertNoFailure()
let stepsPublisher =
dateIntervals
.flatMap { _ in statistic() }
.assertNoFailure()
var bag = Set<AnyCancellable>()
Publishers.Zip3(workoutSegments, stepsPublisher, segmentsWorkoutPublisher)
.receive(on: DispatchQueue.main)
.sink(receiveValue: { pace, steps, hrs in
print("pace: \(pace) steps: \(steps), hrs: \(hrs)")
})
.store(in: &bag)
PlaygroundSupport.PlaygroundPage.current.needsIndefiniteExecution = true