为什么 for 循环中并发队列上的 .async 行为与 DispatchQueue.concurrentPerform 不同?
Why doesn't .async on a concurrent queue in a for loop behave the same as DispatchQueue.concurrentPerform?
import Dispatch
class SynchronizedArray<T> {
private var array: [T] = []
private let accessQueue = DispatchQueue(label: "SynchronizedArrayAccess", attributes: .concurrent)
var get: [T] {
accessQueue.sync {
array
}
}
func append(newElement: T) {
accessQueue.async(flags: .barrier) {
self.array.append(newElement)
}
}
}
如果我 运行 下面的代码,即使我正在并发读取,也会按预期将 10,000 个元素附加到数组中:
DispatchQueue.concurrentPerform(iterations: 10000) { i in
_ threadSafeArray.get
threadSafeArray.append(newElement: i)
}
但是当我这样做时,只是它永远不会接近添加 10,000 个元素(上次我 运行 它在我的计算机上只添加了 92 个元素)。
let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent)
for i in 0..<10000 {
concurrent.async {
_ = threadSafeArray.get
threadSafeArray.append(newElement: i)
}
}
为什么前者有效,为什么后者无效?
似乎我正在经历线程爆炸,因为正在创建 82 个线程并且应用程序 运行 没有线程,我使用的解决方案是一个信号量来限制线程数:
let semaphore = DispatchSemaphore(value: 8)
let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent)
for i in 0..<10000 {
concurrent.async {
_ = threadSafeArray.get
threadSafeArray.append(newElement: i)
semaphore.signal()
}
semaphore.wait()
}
编辑: Rob 的回答解释了上述代码的一些问题
很高兴您找到了线程爆炸的解决方案。请参阅有关线程爆炸的讨论 WWDC 2015 Building Responsive and Efficient Apps with GCD and again in WWDC 2016 Concurrent Programming With GCD in Swift 3。
话虽这么说,DispatchSemaphore
现在有点反模式,因为存在 concurrentPerform
(或 OperationQueue
及其 maxConcurrentOperationCount
或结合其 maxPublishers
)。所有这些都比分派信号量更优雅地管理并发度。
说了这么多,关于 的一些观察:
使用此 DispatchSemaphore
模式时,通常将 wait
放在 concurrent.async { ... }
之前(因为,作为写,你得到九个并发操作,而不是八个,这有点误导)。
这里更深层次的问题是你已经减少了计数问题,但它仍然存在。考虑:
let threadSafeArray = SynchronizedArray<Int>()
let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent)
let semaphore = DispatchSemaphore(value: 8)
for i in 0..<10000 {
semaphore.wait()
concurrent.async {
threadSafeArray.append(newElement: i)
semaphore.signal()
}
}
print(threadSafeArray.get.count)
当你离开 for
循环时,你仍然可以在 concurrent
上有最多八个异步任务仍然 运行,并且 count
(与关于 concurrent
队列)仍然可以少于 10,000。您必须添加另一个 concurrent.async(flags: .barrier) { ... }
,这只是添加第二层同步。例如
let semaphore = DispatchSemaphore(value: 8)
for i in 0..<10000 {
semaphore.wait()
concurrent.async {
threadSafeArray.append(newElement: i)
semaphore.signal()
}
}
concurrent.async(flags: .barrier) {
print(threadSafeArray.get.count)
}
或者您可以使用 DispatchGroup
,用于确定一系列异步调度块何时完成的经典机制:
let semaphore = DispatchSemaphore(value: 8)
let group = DispatchGroup()
for i in 0..<10000 {
semaphore.wait()
concurrent.async(group: group) {
threadSafeArray.append(newElement: i)
semaphore.signal()
}
}
group.notify(queue: .main) {
print(threadSafeArray.get.count)
}
使用 concurrentPerform
消除了对这两种模式的需要,因为在所有并发任务完成之前它不会继续执行。 (它还会根据您设备的核心数自动优化并发度。)
FWIW,SynchronizedArray
的一个更好的替代方法是根本不公开底层数组,只实现您想要公开的任何方法,集成必要的同步。它使调用站点更干净,并解决了许多问题。
例如,假设您想公开下标运算符和一个 count
变量,您可以这样做:
class SynchronizedArray<T> {
private var array: [T]
private let accessQueue = DispatchQueue(label: "com.domain.app.reader-writer", attributes: .concurrent)
init(_ array: [T] = []) {
self.array = array
}
subscript(index: Int) -> T {
get { reader { [=13=][index] } }
set { writer { [=13=][index] = newValue } }
}
var count: Int {
reader { [=13=].count }
}
func append(newElement: T) {
writer { [=13=].append(newElement) }
}
func reader<U>(_ block: ([T]) throws -> U) rethrows -> U {
try accessQueue.sync { try block(array) }
}
func writer(_ block: @escaping (inout [T]) -> Void) {
accessQueue.async(flags: .barrier) { block(&self.array) }
}
}
这解决了各种问题。例如,您现在可以:
print(threadSafeArray.count) // get the count
print(threadSafeArray[500]) // get the 500th item
您现在还可以执行以下操作:
let average = threadSafeArray.reader { array -> Double in
let sum = array.reduce(0, +)
return Double(sum) / Double(array.count)
}
但是,归根结底,在处理集合(或任何可变对象)时,您总是不想公开可变对象本身,而是为常见操作编写自己的同步方法(下标,count
、removeAll
等),并且在应用程序开发人员可能需要更广泛的同步机制的情况下,还可能公开 reader/writer 接口。
(FWIW,对此 SynchronizedArray
的更改适用于信号量或 concurrentPerform
场景;只是信号量恰好在这种情况下显示了问题。)
不用说,您通常也会在每个线程上完成更多工作,因为尽管上下文切换开销不大,但它很可能足以抵消并行处理带来的任何优势. (但我知道这可能只是问题的概念性演示,而不是建议的实施。)仅供未来读者参考。
import Dispatch
class SynchronizedArray<T> {
private var array: [T] = []
private let accessQueue = DispatchQueue(label: "SynchronizedArrayAccess", attributes: .concurrent)
var get: [T] {
accessQueue.sync {
array
}
}
func append(newElement: T) {
accessQueue.async(flags: .barrier) {
self.array.append(newElement)
}
}
}
如果我 运行 下面的代码,即使我正在并发读取,也会按预期将 10,000 个元素附加到数组中:
DispatchQueue.concurrentPerform(iterations: 10000) { i in
_ threadSafeArray.get
threadSafeArray.append(newElement: i)
}
但是当我这样做时,只是它永远不会接近添加 10,000 个元素(上次我 运行 它在我的计算机上只添加了 92 个元素)。
let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent)
for i in 0..<10000 {
concurrent.async {
_ = threadSafeArray.get
threadSafeArray.append(newElement: i)
}
}
为什么前者有效,为什么后者无效?
似乎我正在经历线程爆炸,因为正在创建 82 个线程并且应用程序 运行 没有线程,我使用的解决方案是一个信号量来限制线程数:
let semaphore = DispatchSemaphore(value: 8)
let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent)
for i in 0..<10000 {
concurrent.async {
_ = threadSafeArray.get
threadSafeArray.append(newElement: i)
semaphore.signal()
}
semaphore.wait()
}
编辑: Rob 的回答解释了上述代码的一些问题
很高兴您找到了线程爆炸的解决方案。请参阅有关线程爆炸的讨论 WWDC 2015 Building Responsive and Efficient Apps with GCD and again in WWDC 2016 Concurrent Programming With GCD in Swift 3。
话虽这么说,DispatchSemaphore
现在有点反模式,因为存在 concurrentPerform
(或 OperationQueue
及其 maxConcurrentOperationCount
或结合其 maxPublishers
)。所有这些都比分派信号量更优雅地管理并发度。
说了这么多,关于
使用此
DispatchSemaphore
模式时,通常将wait
放在concurrent.async { ... }
之前(因为,作为写,你得到九个并发操作,而不是八个,这有点误导)。这里更深层次的问题是你已经减少了计数问题,但它仍然存在。考虑:
let threadSafeArray = SynchronizedArray<Int>() let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent) let semaphore = DispatchSemaphore(value: 8) for i in 0..<10000 { semaphore.wait() concurrent.async { threadSafeArray.append(newElement: i) semaphore.signal() } } print(threadSafeArray.get.count)
当你离开
for
循环时,你仍然可以在concurrent
上有最多八个异步任务仍然 运行,并且count
(与关于concurrent
队列)仍然可以少于 10,000。您必须添加另一个concurrent.async(flags: .barrier) { ... }
,这只是添加第二层同步。例如let semaphore = DispatchSemaphore(value: 8) for i in 0..<10000 { semaphore.wait() concurrent.async { threadSafeArray.append(newElement: i) semaphore.signal() } } concurrent.async(flags: .barrier) { print(threadSafeArray.get.count) }
或者您可以使用
DispatchGroup
,用于确定一系列异步调度块何时完成的经典机制:let semaphore = DispatchSemaphore(value: 8) let group = DispatchGroup() for i in 0..<10000 { semaphore.wait() concurrent.async(group: group) { threadSafeArray.append(newElement: i) semaphore.signal() } } group.notify(queue: .main) { print(threadSafeArray.get.count) }
使用
concurrentPerform
消除了对这两种模式的需要,因为在所有并发任务完成之前它不会继续执行。 (它还会根据您设备的核心数自动优化并发度。)FWIW,
SynchronizedArray
的一个更好的替代方法是根本不公开底层数组,只实现您想要公开的任何方法,集成必要的同步。它使调用站点更干净,并解决了许多问题。例如,假设您想公开下标运算符和一个
count
变量,您可以这样做:class SynchronizedArray<T> { private var array: [T] private let accessQueue = DispatchQueue(label: "com.domain.app.reader-writer", attributes: .concurrent) init(_ array: [T] = []) { self.array = array } subscript(index: Int) -> T { get { reader { [=13=][index] } } set { writer { [=13=][index] = newValue } } } var count: Int { reader { [=13=].count } } func append(newElement: T) { writer { [=13=].append(newElement) } } func reader<U>(_ block: ([T]) throws -> U) rethrows -> U { try accessQueue.sync { try block(array) } } func writer(_ block: @escaping (inout [T]) -> Void) { accessQueue.async(flags: .barrier) { block(&self.array) } } }
这解决了各种问题。例如,您现在可以:
print(threadSafeArray.count) // get the count print(threadSafeArray[500]) // get the 500th item
您现在还可以执行以下操作:
let average = threadSafeArray.reader { array -> Double in let sum = array.reduce(0, +) return Double(sum) / Double(array.count) }
但是,归根结底,在处理集合(或任何可变对象)时,您总是不想公开可变对象本身,而是为常见操作编写自己的同步方法(下标,
count
、removeAll
等),并且在应用程序开发人员可能需要更广泛的同步机制的情况下,还可能公开 reader/writer 接口。(FWIW,对此
SynchronizedArray
的更改适用于信号量或concurrentPerform
场景;只是信号量恰好在这种情况下显示了问题。)不用说,您通常也会在每个线程上完成更多工作,因为尽管上下文切换开销不大,但它很可能足以抵消并行处理带来的任何优势. (但我知道这可能只是问题的概念性演示,而不是建议的实施。)仅供未来读者参考。