Combine 的 receive(on:) 没有分派到串行队列,导致数据竞争
Combine's receive(on:) not dispatching to serial queue, causing data race
根据 Apple 的说法,receive(on:options:)
在给定队列上运行回调。在下面的代码中,我们使用串行调度队列来防止在 localOptionalCancellable
上发生竞争。但是 receiveCancel
没有被分派到那个队列。谁能告诉我为什么?
根据文档,
You use the receive(on:options:) operator to receive results and completion on a specific scheduler, such as performing UI work on the main run loop.
...
Prefer receive(on:options:) over explicit use of dispatch queues when performing work in subscribers. For example, instead of the following pattern:
问题复现:
import Foundation
import Combine
class Example {
private var localOptionalCancellable: AnyCancellable?
private let dispatchQueue = DispatchQueue(label: "LocalQueue-\(UUID())")
func misbehavingFunction() {
self.dispatchQueue.async {
self.localOptionalCancellable = Just(())
.setFailureType(to: Error.self)
.receive(on: self.dispatchQueue)
.handleEvents(
receiveCancel: {
// Simultaneous accesses to 0x600000364e10, but modification requires exclusive access.
// Can be fixed by wrapping in self.dispatchQueue.async {}
self.localOptionalCancellable = nil
}
)
.sink(
receiveCompletion: { _ in },
receiveValue: { _ in
self.localOptionalCancellable = nil
}
)
}
}
}
Example().misbehavingFunction()
堆栈跟踪:
Simultaneous accesses to 0x600000364e10, but modification requires exclusive access.
Previous access (a modification) started at (0x10eeaf12a).
Current access (a modification) started at:
0 libswiftCore.dylib 0x00007fff2ff7be50 swift_beginAccess + 568
3 Combine 0x00007fff4ba73a40 Publishers.HandleEvents.Inner.cancel() + 71
4 Combine 0x00007fff4ba74230 protocol witness for Cancellable.cancel() in conformance Publishers.HandleEvents<A>.Inner<A1> + 16
5 Combine 0x00007fff4b9f10c0 Subscribers.Sink.cancel() + 652
6 Combine 0x00007fff4b9f1500 protocol witness for Cancellable.cancel() in conformance Subscribers.Sink<A, B> + 16
7 Combine 0x00007fff4b9dd2d0 AnyCancellable.cancel() + 339
8 Combine 0x00007fff4b9dd5f0 AnyCancellable.__deallocating_deinit + 9
9 libswiftCore.dylib 0x00007fff2ff7da20 _swift_release_dealloc + 16
13 Combine 0x00007fff4b9f0da0 Subscribers.Sink.receive(_:) + 54
14 Combine 0x00007fff4b9f14c0 protocol witness for Subscriber.receive(_:) in conformance Subscribers.Sink<A, B> + 16
15 Combine 0x00007fff4ba73ed0 Publishers.HandleEvents.Inner.receive(_:) + 129
16 Combine 0x00007fff4ba74170 protocol witness for Subscriber.receive(_:) in conformance Publishers.HandleEvents<A>.Inner<A1> + 16
17 Combine 0x00007fff4ba26440 closure #1 in Publishers.ReceiveOn.Inner.receive(_:) + 167
18 libswiftDispatch.dylib 0x000000010e97cad0 thunk for @escaping @callee_guaranteed () -> () + 14
19 libdispatch.dylib 0x00007fff20105323 _dispatch_call_block_and_release + 12
20 libdispatch.dylib 0x00007fff20106500 _dispatch_client_callout + 8
21 libdispatch.dylib 0x00007fff2010c12e _dispatch_lane_serial_drain + 715
22 libdispatch.dylib 0x00007fff2010cde1 _dispatch_lane_invoke + 403
23 libdispatch.dylib 0x00007fff20117269 _dispatch_workloop_worker_thread + 782
24 libsystem_pthread.dylib 0x00007fff6116391b _pthread_wqthread + 290
25 libsystem_pthread.dylib 0x00007fff61162b68 start_wqthread + 15
Fatal access conflict detected.
According to Apple, receive(on:options:)
runs callbacks on a given queue.
不完全是。以下是文档的实际内容:
You use the receive(on:options:)
operator to receive results and completion on a specific scheduler, such as performing UI work on the main run loop. In contrast with subscribe(on:options:)
, which affects upstream messages, receive(on:options:)
changes the execution context of downstream messages.
(强调已添加。)因此 receive(on:)
控制 Scheduler
used to call a Subscriber
's receive(_:)
and receive(completion:)
methods. It does not control the Scheduler
used to call the Subscription
's request(_:)
or cancel()
方法。
要控制Scheduler
用来调用Subscription
的cancel()
方法,需要在handleEvents
下游使用subscribe(on:options:)
运算符运算符,像这样:
self.localOptionalCancellable = Just(())
.setFailureType(to: Error.self)
.receive(on: self.dispatchQueue)
.handleEvents(
receiveCancel: {
// Simultaneous accesses to 0x600000364e10, but modification requires exclusive access.
// Can be fixed by wrapping in self.dispatchQueue.async {}
self.localOptionalCancellable = nil
}
)
.subscribe(on: self.dispatchQueue)
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.sink(
receiveCompletion: { _ in },
receiveValue: { _ in
self.localOptionalCancellable = nil
}
)
根据 Apple 的说法,receive(on:options:)
在给定队列上运行回调。在下面的代码中,我们使用串行调度队列来防止在 localOptionalCancellable
上发生竞争。但是 receiveCancel
没有被分派到那个队列。谁能告诉我为什么?
根据文档,
You use the receive(on:options:) operator to receive results and completion on a specific scheduler, such as performing UI work on the main run loop.
...
Prefer receive(on:options:) over explicit use of dispatch queues when performing work in subscribers. For example, instead of the following pattern:
问题复现:
import Foundation
import Combine
class Example {
private var localOptionalCancellable: AnyCancellable?
private let dispatchQueue = DispatchQueue(label: "LocalQueue-\(UUID())")
func misbehavingFunction() {
self.dispatchQueue.async {
self.localOptionalCancellable = Just(())
.setFailureType(to: Error.self)
.receive(on: self.dispatchQueue)
.handleEvents(
receiveCancel: {
// Simultaneous accesses to 0x600000364e10, but modification requires exclusive access.
// Can be fixed by wrapping in self.dispatchQueue.async {}
self.localOptionalCancellable = nil
}
)
.sink(
receiveCompletion: { _ in },
receiveValue: { _ in
self.localOptionalCancellable = nil
}
)
}
}
}
Example().misbehavingFunction()
堆栈跟踪:
Simultaneous accesses to 0x600000364e10, but modification requires exclusive access.
Previous access (a modification) started at (0x10eeaf12a).
Current access (a modification) started at:
0 libswiftCore.dylib 0x00007fff2ff7be50 swift_beginAccess + 568
3 Combine 0x00007fff4ba73a40 Publishers.HandleEvents.Inner.cancel() + 71
4 Combine 0x00007fff4ba74230 protocol witness for Cancellable.cancel() in conformance Publishers.HandleEvents<A>.Inner<A1> + 16
5 Combine 0x00007fff4b9f10c0 Subscribers.Sink.cancel() + 652
6 Combine 0x00007fff4b9f1500 protocol witness for Cancellable.cancel() in conformance Subscribers.Sink<A, B> + 16
7 Combine 0x00007fff4b9dd2d0 AnyCancellable.cancel() + 339
8 Combine 0x00007fff4b9dd5f0 AnyCancellable.__deallocating_deinit + 9
9 libswiftCore.dylib 0x00007fff2ff7da20 _swift_release_dealloc + 16
13 Combine 0x00007fff4b9f0da0 Subscribers.Sink.receive(_:) + 54
14 Combine 0x00007fff4b9f14c0 protocol witness for Subscriber.receive(_:) in conformance Subscribers.Sink<A, B> + 16
15 Combine 0x00007fff4ba73ed0 Publishers.HandleEvents.Inner.receive(_:) + 129
16 Combine 0x00007fff4ba74170 protocol witness for Subscriber.receive(_:) in conformance Publishers.HandleEvents<A>.Inner<A1> + 16
17 Combine 0x00007fff4ba26440 closure #1 in Publishers.ReceiveOn.Inner.receive(_:) + 167
18 libswiftDispatch.dylib 0x000000010e97cad0 thunk for @escaping @callee_guaranteed () -> () + 14
19 libdispatch.dylib 0x00007fff20105323 _dispatch_call_block_and_release + 12
20 libdispatch.dylib 0x00007fff20106500 _dispatch_client_callout + 8
21 libdispatch.dylib 0x00007fff2010c12e _dispatch_lane_serial_drain + 715
22 libdispatch.dylib 0x00007fff2010cde1 _dispatch_lane_invoke + 403
23 libdispatch.dylib 0x00007fff20117269 _dispatch_workloop_worker_thread + 782
24 libsystem_pthread.dylib 0x00007fff6116391b _pthread_wqthread + 290
25 libsystem_pthread.dylib 0x00007fff61162b68 start_wqthread + 15
Fatal access conflict detected.
According to Apple,
receive(on:options:)
runs callbacks on a given queue.
不完全是。以下是文档的实际内容:
You use the
receive(on:options:)
operator to receive results and completion on a specific scheduler, such as performing UI work on the main run loop. In contrast withsubscribe(on:options:)
, which affects upstream messages,receive(on:options:)
changes the execution context of downstream messages.
(强调已添加。)因此 receive(on:)
控制 Scheduler
used to call a Subscriber
's receive(_:)
and receive(completion:)
methods. It does not control the Scheduler
used to call the Subscription
's request(_:)
or cancel()
方法。
要控制Scheduler
用来调用Subscription
的cancel()
方法,需要在handleEvents
下游使用subscribe(on:options:)
运算符运算符,像这样:
self.localOptionalCancellable = Just(())
.setFailureType(to: Error.self)
.receive(on: self.dispatchQueue)
.handleEvents(
receiveCancel: {
// Simultaneous accesses to 0x600000364e10, but modification requires exclusive access.
// Can be fixed by wrapping in self.dispatchQueue.async {}
self.localOptionalCancellable = nil
}
)
.subscribe(on: self.dispatchQueue)
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.sink(
receiveCompletion: { _ in },
receiveValue: { _ in
self.localOptionalCancellable = nil
}
)