具有自定义 maxConcurrentOperationCount 的 OperationQueue 在完成第一个操作后不会拾取/执行队列中的所有操作
OperationQueue with custom `maxConcurrentOperationCount` does not pick up / execute all operations in the queue, after finishing first operation
我确定我的逻辑有问题,只是想不通是什么。
有一个“服务”class,它有一个操作队列:
class Service {
let queue: OperationQueue = {
var queue = OperationQueue()
queue.name = "my.operationQueue"
queue.maxConcurrentOperationCount = 1
return queue
}()
func add(operation: Operation) {
queue.addOperation(operation)
}
}
该操作是异步的,因此它会覆盖状态,start
函数:
class MyOp: Operation {
private var state: State = .ready
private var id: Int
init(id: Int) {
self.id = id
}
override var isAsynchronous: Bool {
return true
}
override var isReady: Bool {
return state == .ready
}
override var isExecuting: Bool {
return state == .started
}
/// See: `Operation`
override var isFinished: Bool {
return state == .finished || state == .cancelled
}
/// See: `Operation`
override var isCancelled: Bool {
return state == .cancelled
}
override func start() {
guard state == .ready else {
return
}
state = .started
print("\(Date()) started \(id)")
DispatchQueue.global().asyncAfter(deadline: .now() + 2) {
self.state = .finished
print("\(Date()) finished \(self.id)")
}
}
}
private extension MyOp {
enum State {
case ready
case started
case cancelled
case finished
}
}
我正在向队列中添加多个操作(使用 concurrentPerform
进行测试,实际上是不同的):
let iterations = 20
let service = Service()
DispatchQueue.concurrentPerform(iterations: iterations) { iteration in
let operation = MyOp(id: iteration)
service.add(operation: operation)
}
DispatchQueue.global().asyncAfter(deadline: .now() + 40) {
print("\(Date()) after run \(String(describing: service.queue.operations))")
}
我期待什么
- 20 个操作被添加到队列中(因为
let iterations = 20
)
- 1 操作立即开始运行,其他人在队列中等待(因为
queue.maxConcurrentOperationCount = 1
)
- 第一个操作完成后,第二个开始,依此类推。
- 打印队列内容的最后一个块不应包含任何项目,或最多 1-2 个剩余项目。
实际发生了什么
操作已按预期添加到队列中。
我看到只有 1 个操作开始和完成,其余操作从未开始。最后一个块,在添加所有操作后 40 秒打印队列内容(大致足以完成所有或几乎所有操作的时间),显示剩余操作仍在队列中,而不是 运行ning。这是一个例子:
<NSOperationQueue: 0x7fd477f09460>{name = 'my.operationQueue'}
2022-03-23 21:05:51 +0000 started 11
2022-03-23 21:05:53 +0000 finished 11
2022-03-23 21:06:31 +0000 after run [
<__lldb_expr_25.MyOp 0x7fd479406660 isFinished=YES isReady=NO isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd477f04080 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd479206a70 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd460904190 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd479004080 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd479406550 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd460804080 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd470904480 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd460904080 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd460804190 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd460a04080 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd4793068c0 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd460b04080 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd477f0a160 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd460a04190 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd479406770 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd4608042a0 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd4792092f0 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd47910a360 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd4609042a0 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>
]
那我做错了什么?
注:
- 这不是
print
错误的问题,因为在实际代码中我没有使用它
- 同样在实际代码中没有
DispatchQueue.global().asyncAfter(deadline: .now() + 2)
- 这只是为了模拟一个 运行ning 异步操作。
更新:我将问题提炼为maxConcurrentOperationCount
:如果我删除行queue.maxConcurrentOperationCount = 1
,队列将按预期工作。将其设置为任何其他值,都会产生类似的问题。
还是不明白为什么不对。
问题是这些方法不 KVC/KVO 兼容。正如 Operation
documentation 所说:
The NSOperation
class is key-value coding (KVC) and key-value observing (KVO) compliant for several of its properties.
…
If you provide custom implementations for any of the preceding properties, your implementations must maintain KVC and KVO compliance.
并发程度的约束(例如,两者都 maxConcurrentOperationCount
and addDependency(_:)
)依赖于 KVO 来了解先前的操作何时完成。如果您未能执行所需的 KVO 通知,队列将不知道后续操作何时可以进行。
有关示例实现,请参阅 的后半部分。
FWIW,这是一个异步操作实现:
public class AsynchronousOperation: Operation {
@Atomic @objc private dynamic var state: OperationState = .ready
// MARK: - Various `Operation` properties
open override var isReady: Bool { state == .ready && super.isReady }
public final override var isExecuting: Bool { state == .executing }
public final override var isFinished: Bool { state == .finished }
public final override var isAsynchronous: Bool { true }
// KVO for dependent properties
open override class func keyPathsForValuesAffectingValue(forKey key: String) -> Set<String> {
if [#keyPath(isReady), #keyPath(isFinished), #keyPath(isExecuting)].contains(key) {
return [#keyPath(state)]
}
return super.keyPathsForValuesAffectingValue(forKey: key)
}
// Start
public final override func start() {
if isCancelled {
state = .finished
return
}
state = .executing
main()
}
/// Subclasses must implement this to perform their work and they must not call `super`. The default implementation of this function throws an exception.
open override func main() {
fatalError("Subclasses must implement `main`.")
}
/// Call this function to finish an operation that is currently executing
public final func finish() {
if !isFinished { state = .finished }
}
}
private extension AsynchronousOperation {
/// State for this operation.
@objc enum OperationState: Int {
case ready
case executing
case finished
}
}
具有以下内容:
@propertyWrapper
public class Atomic<T> {
private var _wrappedValue: T
private var lock = NSLock()
public var wrappedValue: T {
get { lock.synchronized { _wrappedValue } }
set { lock.synchronized { _wrappedValue = newValue } }
}
public init(wrappedValue: T) {
_wrappedValue = wrappedValue
}
}
extension NSLocking {
func synchronized<T>(block: () throws -> T) rethrows -> T {
lock()
defer { unlock() }
return try block()
}
}
通过上面的内容,我将异步 Operation
代码抽象为可以子类化并继承异步行为的代码。例如,这是一个与您的示例执行相同 asyncAfter
的操作(但有一些额外的 OSLog
路标,因此我可以直观地看到 Instruments 中的操作):
import os.log
private let log = OSLog(subsystem: "Op", category: .pointsOfInterest)
class MyOperation: AsynchronousOperation {
var value: Int
init(value: Int) {
self.value = value
super.init()
}
override func main() {
let id = OSSignpostID(log: log)
os_signpost(.begin, log: log, name: "Operation", signpostID: id, "%d", value)
DispatchQueue.main.asyncAfter(deadline: .now() + 1) { [self] in
finish()
os_signpost(.end, log: log, name: "Operation", signpostID: id, "%d", value)
}
}
}
然后...
let queue = OperationQueue()
queue.maxConcurrentOperationCount = 1
for i in 0..<5 {
queue.addOperation(MyOperation(value: i))
}
... 生成如下操作的时间表:
我确定我的逻辑有问题,只是想不通是什么。
有一个“服务”class,它有一个操作队列:
class Service {
let queue: OperationQueue = {
var queue = OperationQueue()
queue.name = "my.operationQueue"
queue.maxConcurrentOperationCount = 1
return queue
}()
func add(operation: Operation) {
queue.addOperation(operation)
}
}
该操作是异步的,因此它会覆盖状态,start
函数:
class MyOp: Operation {
private var state: State = .ready
private var id: Int
init(id: Int) {
self.id = id
}
override var isAsynchronous: Bool {
return true
}
override var isReady: Bool {
return state == .ready
}
override var isExecuting: Bool {
return state == .started
}
/// See: `Operation`
override var isFinished: Bool {
return state == .finished || state == .cancelled
}
/// See: `Operation`
override var isCancelled: Bool {
return state == .cancelled
}
override func start() {
guard state == .ready else {
return
}
state = .started
print("\(Date()) started \(id)")
DispatchQueue.global().asyncAfter(deadline: .now() + 2) {
self.state = .finished
print("\(Date()) finished \(self.id)")
}
}
}
private extension MyOp {
enum State {
case ready
case started
case cancelled
case finished
}
}
我正在向队列中添加多个操作(使用 concurrentPerform
进行测试,实际上是不同的):
let iterations = 20
let service = Service()
DispatchQueue.concurrentPerform(iterations: iterations) { iteration in
let operation = MyOp(id: iteration)
service.add(operation: operation)
}
DispatchQueue.global().asyncAfter(deadline: .now() + 40) {
print("\(Date()) after run \(String(describing: service.queue.operations))")
}
我期待什么
- 20 个操作被添加到队列中(因为
let iterations = 20
) - 1 操作立即开始运行,其他人在队列中等待(因为
queue.maxConcurrentOperationCount = 1
) - 第一个操作完成后,第二个开始,依此类推。
- 打印队列内容的最后一个块不应包含任何项目,或最多 1-2 个剩余项目。
实际发生了什么
操作已按预期添加到队列中。
我看到只有 1 个操作开始和完成,其余操作从未开始。最后一个块,在添加所有操作后 40 秒打印队列内容(大致足以完成所有或几乎所有操作的时间),显示剩余操作仍在队列中,而不是 运行ning。这是一个例子:
<NSOperationQueue: 0x7fd477f09460>{name = 'my.operationQueue'}
2022-03-23 21:05:51 +0000 started 11
2022-03-23 21:05:53 +0000 finished 11
2022-03-23 21:06:31 +0000 after run [
<__lldb_expr_25.MyOp 0x7fd479406660 isFinished=YES isReady=NO isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd477f04080 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd479206a70 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd460904190 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd479004080 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd479406550 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd460804080 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd470904480 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd460904080 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd460804190 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd460a04080 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd4793068c0 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd460b04080 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd477f0a160 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd460a04190 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd479406770 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd4608042a0 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd4792092f0 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd47910a360 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>,
<__lldb_expr_25.MyOp 0x7fd4609042a0 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>
]
那我做错了什么?
注:
- 这不是
print
错误的问题,因为在实际代码中我没有使用它 - 同样在实际代码中没有
DispatchQueue.global().asyncAfter(deadline: .now() + 2)
- 这只是为了模拟一个 运行ning 异步操作。
更新:我将问题提炼为maxConcurrentOperationCount
:如果我删除行queue.maxConcurrentOperationCount = 1
,队列将按预期工作。将其设置为任何其他值,都会产生类似的问题。
还是不明白为什么不对。
问题是这些方法不 KVC/KVO 兼容。正如 Operation
documentation 所说:
The
NSOperation
class is key-value coding (KVC) and key-value observing (KVO) compliant for several of its properties.…
If you provide custom implementations for any of the preceding properties, your implementations must maintain KVC and KVO compliance.
并发程度的约束(例如,两者都 maxConcurrentOperationCount
and addDependency(_:)
)依赖于 KVO 来了解先前的操作何时完成。如果您未能执行所需的 KVO 通知,队列将不知道后续操作何时可以进行。
有关示例实现,请参阅
FWIW,这是一个异步操作实现:
public class AsynchronousOperation: Operation {
@Atomic @objc private dynamic var state: OperationState = .ready
// MARK: - Various `Operation` properties
open override var isReady: Bool { state == .ready && super.isReady }
public final override var isExecuting: Bool { state == .executing }
public final override var isFinished: Bool { state == .finished }
public final override var isAsynchronous: Bool { true }
// KVO for dependent properties
open override class func keyPathsForValuesAffectingValue(forKey key: String) -> Set<String> {
if [#keyPath(isReady), #keyPath(isFinished), #keyPath(isExecuting)].contains(key) {
return [#keyPath(state)]
}
return super.keyPathsForValuesAffectingValue(forKey: key)
}
// Start
public final override func start() {
if isCancelled {
state = .finished
return
}
state = .executing
main()
}
/// Subclasses must implement this to perform their work and they must not call `super`. The default implementation of this function throws an exception.
open override func main() {
fatalError("Subclasses must implement `main`.")
}
/// Call this function to finish an operation that is currently executing
public final func finish() {
if !isFinished { state = .finished }
}
}
private extension AsynchronousOperation {
/// State for this operation.
@objc enum OperationState: Int {
case ready
case executing
case finished
}
}
具有以下内容:
@propertyWrapper
public class Atomic<T> {
private var _wrappedValue: T
private var lock = NSLock()
public var wrappedValue: T {
get { lock.synchronized { _wrappedValue } }
set { lock.synchronized { _wrappedValue = newValue } }
}
public init(wrappedValue: T) {
_wrappedValue = wrappedValue
}
}
extension NSLocking {
func synchronized<T>(block: () throws -> T) rethrows -> T {
lock()
defer { unlock() }
return try block()
}
}
通过上面的内容,我将异步 Operation
代码抽象为可以子类化并继承异步行为的代码。例如,这是一个与您的示例执行相同 asyncAfter
的操作(但有一些额外的 OSLog
路标,因此我可以直观地看到 Instruments 中的操作):
import os.log
private let log = OSLog(subsystem: "Op", category: .pointsOfInterest)
class MyOperation: AsynchronousOperation {
var value: Int
init(value: Int) {
self.value = value
super.init()
}
override func main() {
let id = OSSignpostID(log: log)
os_signpost(.begin, log: log, name: "Operation", signpostID: id, "%d", value)
DispatchQueue.main.asyncAfter(deadline: .now() + 1) { [self] in
finish()
os_signpost(.end, log: log, name: "Operation", signpostID: id, "%d", value)
}
}
}
然后...
let queue = OperationQueue()
queue.maxConcurrentOperationCount = 1
for i in 0..<5 {
queue.addOperation(MyOperation(value: i))
}
... 生成如下操作的时间表: