为什么 for 循环中并发队列上的 .async 行为与 DispatchQueue.concurrentPerform 不同?

Why doesn't .async on a concurrent queue in a for loop behave the same as DispatchQueue.concurrentPerform?

import Dispatch

class SynchronizedArray<T> {
    private var array: [T] = []
    private let accessQueue = DispatchQueue(label: "SynchronizedArrayAccess", attributes: .concurrent)
    
    var get: [T] {
        accessQueue.sync {
            array
        }
    }
    
    func append(newElement: T) {
        accessQueue.async(flags: .barrier) {
            self.array.append(newElement)
        }
    }
}

如果我 运行 下面的代码,即使我正在并发读取,也会按预期将 10,000 个元素附加到数组中:

DispatchQueue.concurrentPerform(iterations: 10000) { i in
    _ threadSafeArray.get
    threadSafeArray.append(newElement: i)
}

但是当我这样做时,只是它永远不会接近添加 10,000 个元素(上次我 运行 它在我的计算机上只添加了 92 个元素)。

let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent)
for i in 0..<10000 {
    concurrent.async {
        _ = threadSafeArray.get
        threadSafeArray.append(newElement: i)
    }
}

为什么前者有效,为什么后者无效?

似乎我正在经历线程爆炸,因为正在创建 82 个线程并且应用程序 运行 没有线程,我使用的解决方案是一个信号量来限制线程数:

let semaphore = DispatchSemaphore(value: 8)
let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent)
for i in 0..<10000 {
    concurrent.async {
        _ = threadSafeArray.get
        threadSafeArray.append(newElement: i)
        semaphore.signal()
    }
    
    semaphore.wait()
}

编辑: Rob 的回答解释了上述代码的一些问题

很高兴您找到了线程爆炸的解决方案。请参阅有关线程爆炸的讨论 WWDC 2015 Building Responsive and Efficient Apps with GCD and again in WWDC 2016 Concurrent Programming With GCD in Swift 3

话虽这么说,DispatchSemaphore 现在有点反模式,因为存在 concurrentPerform(或 OperationQueue 及其 maxConcurrentOperationCount 或结合其 maxPublishers)。所有这些都比分派信号量更优雅地管理并发度。

说了这么多,关于 的一些观察:

  1. 使用此 DispatchSemaphore 模式时,通常将 wait 放在 concurrent.async { ... } 之前(因为,作为写,你得到九个并发操作,而不是八个,这有点误导)。

  2. 这里更深层次的问题是你已经减少了计数问题,但它仍然存在。考虑:

    let threadSafeArray = SynchronizedArray<Int>()
    
    let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent)
    let semaphore = DispatchSemaphore(value: 8)
    for i in 0..<10000 {
        semaphore.wait()
        concurrent.async {
            threadSafeArray.append(newElement: i)
            semaphore.signal()
        }
    }
    
    print(threadSafeArray.get.count)
    

    当你离开 for 循环时,你仍然可以在 concurrent 上有最多八个异步任务仍然 运行,并且 count(与关于 concurrent 队列)仍然可以少于 10,000。您必须添加另一个 concurrent.async(flags: .barrier) { ... },这只是添加第二层同步。例如

    let semaphore = DispatchSemaphore(value: 8)
    for i in 0..<10000 {
        semaphore.wait()
        concurrent.async {
            threadSafeArray.append(newElement: i)
            semaphore.signal()
        }
    }
    
    concurrent.async(flags: .barrier) {
        print(threadSafeArray.get.count)
    }
    

    或者您可以使用 DispatchGroup,用于确定一系列异步调度块何时完成的经典机制:

    let semaphore = DispatchSemaphore(value: 8)
    let group = DispatchGroup()
    
    for i in 0..<10000 {
        semaphore.wait()
        concurrent.async(group: group) {
            threadSafeArray.append(newElement: i)
            semaphore.signal()
        }
    }
    
    group.notify(queue: .main) {
        print(threadSafeArray.get.count)
    }
    

    使用 concurrentPerform 消除了对这两种模式的需要,因为在所有并发任务完成之前它不会继续执行。 (它还会根据您设备的核心数自动优化并发度。)

  3. FWIW,SynchronizedArray 的一个更好的替代方法是根本不公开底层数组,只实现您想要公开的任何方法,集成必要的同步。它使调用站点更干净,并解决了许多问题。

    例如,假设您想公开下标运算符和一个 count 变量,您可以这样做:

    class SynchronizedArray<T> {
        private var array: [T]
        private let accessQueue = DispatchQueue(label: "com.domain.app.reader-writer", attributes: .concurrent)
    
        init(_ array: [T] = []) {
            self.array = array
        }
    
        subscript(index: Int) -> T {
            get { reader { [=13=][index] } }
            set { writer { [=13=][index] = newValue } }
        }
    
        var count: Int {
            reader { [=13=].count }
        }
    
        func append(newElement: T) {
            writer { [=13=].append(newElement) }
        }
    
        func reader<U>(_ block: ([T]) throws -> U) rethrows -> U {
            try accessQueue.sync { try block(array) }
        }
    
        func writer(_ block: @escaping (inout [T]) -> Void) {
            accessQueue.async(flags: .barrier) { block(&self.array) }
        }
    }
    

    这解决了各种问题。例如,您现在可以:

    print(threadSafeArray.count) // get the count
    print(threadSafeArray[500])  // get the 500th item
    

    您现在还可以执行以下操作:

    let average = threadSafeArray.reader { array -> Double in
        let sum = array.reduce(0, +)
        return Double(sum) / Double(array.count)
    }
    

    但是,归根结底,在处理集合(或任何可变对象)时,您总是不想公开可变对象本身,而是为常见操作编写自己的同步方法(下标,countremoveAll 等),并且在应用程序开发人员可能需要更广泛的同步机制的情况下,还可能公开 reader/writer 接口。

    (FWIW,对此 SynchronizedArray 的更改适用于信号量或 concurrentPerform 场景;只是信号量恰好在这种情况下显示了问题。)

  4. 不用说,您通常也会在每个线程上完成更多工作,因为尽管上下文切换开销不大,但它很可能足以抵消并行处理带来的任何优势. (但我知道这可能只是问题的概念性演示,而不是建议的实施。)仅供未来读者参考。