具有自定义 maxConcurrentOperationCount 的 OperationQueue 在完成第一个操作后不会拾取/执行队列中的所有操作

OperationQueue with custom `maxConcurrentOperationCount` does not pick up / execute all operations in the queue, after finishing first operation

我确定我的逻辑有问题,只是想不通是什么。

有一个“服务”class,它有一个操作队列:

class Service {

    let queue: OperationQueue = {

        var queue = OperationQueue()
        queue.name = "my.operationQueue"
        queue.maxConcurrentOperationCount = 1
        return queue
    }()

    func add(operation: Operation) {
        queue.addOperation(operation)
    }
}

该操作是异步的,因此它会覆盖状态,start 函数:

class MyOp: Operation {

    private var state: State = .ready
    private var id: Int

    init(id: Int) {
        self.id = id
    }

    override var isAsynchronous: Bool {
        return true
    }

    override var isReady: Bool {
        return state == .ready
    }

    override var isExecuting: Bool {
        return state == .started
    }

    /// See: `Operation`
    override var isFinished: Bool {
        return state == .finished || state == .cancelled
    }

    /// See: `Operation`
    override var isCancelled: Bool {
        return state == .cancelled
    }

    override func start() {

        guard state == .ready else {
            return
        }

        state = .started
        print("\(Date()) started \(id)")

        DispatchQueue.global().asyncAfter(deadline: .now() + 2) {
            self.state = .finished
            print("\(Date()) finished \(self.id)")
        }
    }
}

private extension MyOp {

    enum State {

        case ready
        case started
        case cancelled
        case finished
    }
}

我正在向队列中添加多个操作(使用 concurrentPerform 进行测试,实际上是不同的):

let iterations = 20
let service = Service()

DispatchQueue.concurrentPerform(iterations: iterations) { iteration in

    let operation = MyOp(id: iteration)
    service.add(operation: operation)
}
DispatchQueue.global().asyncAfter(deadline: .now() + 40) {
    print("\(Date()) after run \(String(describing: service.queue.operations))")
}

我期待什么

实际发生了什么

操作已按预期添加到队列中。

我看到只有 1 个操作开始和完成,其余操作从未开始。最后一个块,在添加所有操作后 40 秒打印队列内容(大致足以完成所有或几乎所有操作的时间),显示剩余操作仍在队列中,而不是 运行ning。这是一个例子:

<NSOperationQueue: 0x7fd477f09460>{name = 'my.operationQueue'}
2022-03-23 21:05:51 +0000 started 11
2022-03-23 21:05:53 +0000 finished 11
2022-03-23 21:06:31 +0000 after run [
  <__lldb_expr_25.MyOp 0x7fd479406660 isFinished=YES isReady=NO isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd477f04080 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd479206a70 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd460904190 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd479004080 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd479406550 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd460804080 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd470904480 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd460904080 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd460804190 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd460a04080 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd4793068c0 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd460b04080 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd477f0a160 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd460a04190 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd479406770 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd4608042a0 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd4792092f0 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd47910a360 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd4609042a0 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>
 ]

那我做错了什么?

注:

更新:我将问题提炼为maxConcurrentOperationCount:如果我删除行queue.maxConcurrentOperationCount = 1,队列将按预期工作。将其设置为任何其他值,都会产生类似的问题。

还是不明白为什么不对。

问题是这些方法不 KVC/KVO 兼容。正如 Operation documentation 所说:

The NSOperation class is key-value coding (KVC) and key-value observing (KVO) compliant for several of its properties.

If you provide custom implementations for any of the preceding properties, your implementations must maintain KVC and KVO compliance.

并发程度的约束(例如,两者都 maxConcurrentOperationCount and addDependency(_:))依赖于 KVO 来了解先前的操作何时完成。如果您未能执行所需的 KVO 通知,队列将不知道后续操作何时可以进行。

有关示例实现,请参阅 的后半部分。


FWIW,这是一个异步操作实现:

public class AsynchronousOperation: Operation {

    @Atomic @objc private dynamic var state: OperationState = .ready

    // MARK: - Various `Operation` properties

    open         override var isReady:        Bool { state == .ready && super.isReady }
    public final override var isExecuting:    Bool { state == .executing }
    public final override var isFinished:     Bool { state == .finished }
    public final override var isAsynchronous: Bool { true }

    // KVO for dependent properties

    open override class func keyPathsForValuesAffectingValue(forKey key: String) -> Set<String> {
        if [#keyPath(isReady), #keyPath(isFinished), #keyPath(isExecuting)].contains(key) {
            return [#keyPath(state)]
        }

        return super.keyPathsForValuesAffectingValue(forKey: key)
    }

    // Start

    public final override func start() {
        if isCancelled {
            state = .finished
            return
        }

        state = .executing

        main()
    }

    /// Subclasses must implement this to perform their work and they must not call `super`. The default implementation of this function throws an exception.

    open override func main() {
        fatalError("Subclasses must implement `main`.")
    }

    /// Call this function to finish an operation that is currently executing

    public final func finish() {
        if !isFinished { state = .finished }
    }
}

private extension AsynchronousOperation {
    /// State for this operation.

    @objc enum OperationState: Int {
        case ready
        case executing
        case finished
    }
}

具有以下内容:

@propertyWrapper
public class Atomic<T> {
    private var _wrappedValue: T
    private var lock = NSLock()

    public var wrappedValue: T {
        get { lock.synchronized { _wrappedValue } }
        set { lock.synchronized { _wrappedValue = newValue } }
    }

    public init(wrappedValue: T) {
        _wrappedValue = wrappedValue
    }
}

extension NSLocking {
    func synchronized<T>(block: () throws -> T) rethrows -> T {
        lock()
        defer { unlock() }
        return try block()
    }
}

通过上面的内容,我将异步 Operation 代码抽象为可以子类化并继承异步行为的代码。例如,这是一个与您的示例执行相同 asyncAfter 的操作(但有一些额外的 OSLog 路标,因此我可以直观地看到 Instruments 中的操作):

import os.log

private let log = OSLog(subsystem: "Op", category: .pointsOfInterest)

class MyOperation: AsynchronousOperation {
    var value: Int

    init(value: Int) {
        self.value = value
        super.init()
    }

    override func main() {
        let id = OSSignpostID(log: log)
        os_signpost(.begin, log: log, name: "Operation", signpostID: id, "%d", value)

        DispatchQueue.main.asyncAfter(deadline: .now() + 1) { [self] in
            finish()
            os_signpost(.end, log: log, name: "Operation", signpostID: id, "%d", value)
        }
    }
}

然后...

let queue = OperationQueue()
queue.maxConcurrentOperationCount = 1

for i in 0..<5 {
    queue.addOperation(MyOperation(value: i))
}

... 生成如下操作的时间表: