具有依赖性的异步任务执行
Async tasks execution with dependency
情况:
我有 2 个任务在异步后台模式下说 T1 和 T2。 T2 依赖于 T1 并且有 successBlock 在两个任务完成后执行 T1 & T2.
为了更好地理解,下面是快速图表。
编辑:
为了更好地理解任务,您可以假设 T1 和 T2 是 API 调用,它们总是在异步模式下执行。我需要一些来自 T1 的输出数据来命中 T2 API。完成这两项任务后,我需要更新 UI.
为了完成这个场景,我在 T1 中添加了我的第一个异步工作,在 T2 中添加了第二个工作,并且依赖于 T2 到 T1 和 successblock 对这两个任务都有依赖性。
代码工作
我的任务
class TaskManager {
static let shared = TaskManager()
func task1Call(complete: @escaping ()->()) {
DispatchQueue.global(qos: .background).async {
for i in 0...10 {
print("~~> Task 1 Executing ..", i)
sleep(1)
}
complete()
}
}
func task2Call(complete: @escaping ()->()) {
DispatchQueue.global(qos: .background).async {
for i in 0...10 {
print("==> Task 2 Executing ..", i)
sleep(1)
}
complete()
}
}
}
执行任务
class Execution {
// Managing tasks with OperationQueue
func executeTaskWithOperation() {
let t1 = BlockOperation {
TaskManager.shared.task1Call {
print("Task 1 Completed")
}
}
let t2 = BlockOperation {
TaskManager.shared.task2Call {
print("Task 2 Completed")
}
}
let successBlock = BlockOperation {
print("Tasks Completed")
}
let oper = OperationQueue()
t2.addDependency(t1)
successBlock.addDependency(t2)
successBlock.addDependency(t1)
oper.addOperations([t1, t2, successBlock], waitUntilFinished: true)
}
}
let e = Execution()
e.executeTaskWithOperation()
问题:
两个任务并行执行,successBlock 在任务 1 和任务 2 完成之前执行。
控制台输出:
==> Task 2 Executing .. 0
Tasks Completed
~~> Task 1 Executing .. 0
~~> Task 1 Executing .. 1
==> Task 2 Executing .. 1
==> Task 2 Executing .. 2
~~> Task 1 Executing .. 2
==> Task 2 Executing .. 3
~~> Task 1 Executing .. 3
==> Task 2 Executing .. 4
~~> Task 1 Executing .. 4
==> Task 2 Executing .. 5
~~> Task 1 Executing .. 5
==> Task 2 Executing .. 6
~~> Task 1 Executing .. 6
==> Task 2 Executing .. 7
~~> Task 1 Executing .. 7
==> Task 2 Executing .. 8
~~> Task 1 Executing .. 8
==> Task 2 Executing .. 9
~~> Task 1 Executing .. 9
~~> Task 1 Executing .. 10
==> Task 2 Executing .. 10
Task 1 Completed
Task 2 Completed
我无法弄清楚我做错了什么,当我使用同步模式而不是异步模式时,即使是相同的代码也能正常工作。
您的 t1
和 t2
是产生后台线程的块操作(每个都执行一些打印然后退出,但这并不重要)。一旦它们完成产卵,它们就被认为已经完成。 successBlock
取决于生成的两个后台线程,然后就完成了。您想要 BlockOperation
本身的工作:
class Execution {
// Managing tasks with OperationQueue
func executeTaskWithOperation() {
let t1 = BlockOperation {
for i in 0...10 {
print("~~> Task 1 Executing ..", i)
sleep(1)
}
print("Task 1 completed")
}
let t2 = BlockOperation {
for i in 0...10 {
print("==> Task 2 Executing ..", i)
sleep(1)
}
print("Task 2 Completed")
}
let successBlock = BlockOperation {
print("Tasks Completed")
}
let oper = OperationQueue()
t2.addDependency(t1) // Remove this to see concurrent exec of t1 and t2
successBlock.addDependency(t2)
successBlock.addDependency(t1)
oper.addOperations([t1, t2, successBlock], waitUntilFinished: true)
}
}
let e = Execution()
e.executeTaskWithOperation()
编辑:为了在后台线程上执行,覆盖 Operation
。
class AsyncOp: Operation {
let task: String
var running = false
var done = false
init(_ task: String) {
self.task = task
}
override var isAsynchronous: Bool { true }
override var isExecuting: Bool {
get { running }
set {
willChangeValue(forKey: "isExecuting")
running = newValue
didChangeValue(forKey: "isExecuting")
}
}
override var isFinished: Bool {
get { done }
set {
willChangeValue(forKey: "isFinished")
done = newValue
didChangeValue(forKey: "isFinished")
}
}
override func main() {
DispatchQueue.global(qos: .background).async {
self.isExecuting = true
for i in 0...10 {
print("\(self.task) Executing ..", i)
sleep(1)
}
print("Done")
self.isExecuting = false
self.isFinished = true
}
}
override func start() {
print("\(task) starting")
main()
}
}
class Execution {
// Managing tasks with OperationQueue
func executeTaskWithOperation() {
let t1 = AsyncOp("task1")
let t2 = AsyncOp("task2")
let successBlock = BlockOperation {
print("Tasks Completed")
}
let oper = OperationQueue()
t2.addDependency(t1)
successBlock.addDependency(t2)
successBlock.addDependency(t1)
oper.addOperations([t1, t2, successBlock], waitUntilFinished: true)
}
}
let e = Execution()
e.executeTaskWithOperation()
在之后,我可以得出答案。
执行从 OperationQueue
更改为 DispatchGroup
和 DispatchSemaphore
。
DispatchGroup : 它确保两个任务都已完成,然后调用 notify
块。
DispatchSemaphore : 它使用 wait 命令保存异步资源,直到我们不会发送 signal 命令,即我们对信号量说要等到 task1 未完成。
任务示例代码。
class Execution {
// Managing tasks with DispatchGroup
func executeTaskWithGroup() {
let groups = DispatchGroup()
let semaphore = DispatchSemaphore(value: 1)
groups.enter()
semaphore.wait()
TaskManager.shared.task1Call {
groups.leave()
semaphore.signal()
}
groups.enter()
TaskManager.shared.task2Call {
groups.leave()
}
groups.notify(queue: DispatchQueue.global(qos: .background)) {
print("Tasks Completed")
}
}
}
要执行命令,我们需要做的就是。
let e = Execution()
e.executeTaskWithGroup()
但是上面的代码在主线程中执行并阻塞了UI。为了防止这种情况,您需要在后台队列中调用上面的代码,如下所示。
let queue = DispatchQueue.init(label: "MyQueue", qos: .background, attributes: .concurrent, autoreleaseFrequency: .workItem, target: nil)
queue.async {
let e = Execution()
e.executeTaskWithGroup()
}
现在一切正常,符合我的需要。
插件
以防万一,如果有人要求在上述情况下调用多个 API,则将您的任务异步添加到队列中。
let queue = DispatchQueue.init(label: "MyQueue", qos: .background, attributes: .concurrent, autoreleaseFrequency: .workItem, target: nil)
queue.async {
let e1 = Execution()
e1.executeTaskWithGroup()
}
queue.async {
let e2 = Execution()
e2.executeTaskWithGroup()
}
现在 e1 和 e2 可以并行执行,不会阻塞主线程。
参考资料:
Dexecutor 在这里救援
免责声明:我是 Dexecutor 的所有者
Dexecutor 可以很容易地用于 workflow 等用例
情况:
我有 2 个任务在异步后台模式下说 T1 和 T2。 T2 依赖于 T1 并且有 successBlock 在两个任务完成后执行 T1 & T2.
为了更好地理解,下面是快速图表。
编辑:
为了更好地理解任务,您可以假设 T1 和 T2 是 API 调用,它们总是在异步模式下执行。我需要一些来自 T1 的输出数据来命中 T2 API。完成这两项任务后,我需要更新 UI.
为了完成这个场景,我在 T1 中添加了我的第一个异步工作,在 T2 中添加了第二个工作,并且依赖于 T2 到 T1 和 successblock 对这两个任务都有依赖性。
代码工作
我的任务
class TaskManager { static let shared = TaskManager() func task1Call(complete: @escaping ()->()) { DispatchQueue.global(qos: .background).async { for i in 0...10 { print("~~> Task 1 Executing ..", i) sleep(1) } complete() } } func task2Call(complete: @escaping ()->()) { DispatchQueue.global(qos: .background).async { for i in 0...10 { print("==> Task 2 Executing ..", i) sleep(1) } complete() } } }
执行任务
class Execution { // Managing tasks with OperationQueue func executeTaskWithOperation() { let t1 = BlockOperation { TaskManager.shared.task1Call { print("Task 1 Completed") } } let t2 = BlockOperation { TaskManager.shared.task2Call { print("Task 2 Completed") } } let successBlock = BlockOperation { print("Tasks Completed") } let oper = OperationQueue() t2.addDependency(t1) successBlock.addDependency(t2) successBlock.addDependency(t1) oper.addOperations([t1, t2, successBlock], waitUntilFinished: true) } } let e = Execution() e.executeTaskWithOperation()
问题:
两个任务并行执行,successBlock 在任务 1 和任务 2 完成之前执行。
控制台输出:
==> Task 2 Executing .. 0
Tasks Completed
~~> Task 1 Executing .. 0
~~> Task 1 Executing .. 1
==> Task 2 Executing .. 1
==> Task 2 Executing .. 2
~~> Task 1 Executing .. 2
==> Task 2 Executing .. 3
~~> Task 1 Executing .. 3
==> Task 2 Executing .. 4
~~> Task 1 Executing .. 4
==> Task 2 Executing .. 5
~~> Task 1 Executing .. 5
==> Task 2 Executing .. 6
~~> Task 1 Executing .. 6
==> Task 2 Executing .. 7
~~> Task 1 Executing .. 7
==> Task 2 Executing .. 8
~~> Task 1 Executing .. 8
==> Task 2 Executing .. 9
~~> Task 1 Executing .. 9
~~> Task 1 Executing .. 10
==> Task 2 Executing .. 10
Task 1 Completed
Task 2 Completed
我无法弄清楚我做错了什么,当我使用同步模式而不是异步模式时,即使是相同的代码也能正常工作。
您的 t1
和 t2
是产生后台线程的块操作(每个都执行一些打印然后退出,但这并不重要)。一旦它们完成产卵,它们就被认为已经完成。 successBlock
取决于生成的两个后台线程,然后就完成了。您想要 BlockOperation
本身的工作:
class Execution {
// Managing tasks with OperationQueue
func executeTaskWithOperation() {
let t1 = BlockOperation {
for i in 0...10 {
print("~~> Task 1 Executing ..", i)
sleep(1)
}
print("Task 1 completed")
}
let t2 = BlockOperation {
for i in 0...10 {
print("==> Task 2 Executing ..", i)
sleep(1)
}
print("Task 2 Completed")
}
let successBlock = BlockOperation {
print("Tasks Completed")
}
let oper = OperationQueue()
t2.addDependency(t1) // Remove this to see concurrent exec of t1 and t2
successBlock.addDependency(t2)
successBlock.addDependency(t1)
oper.addOperations([t1, t2, successBlock], waitUntilFinished: true)
}
}
let e = Execution()
e.executeTaskWithOperation()
编辑:为了在后台线程上执行,覆盖 Operation
。
class AsyncOp: Operation {
let task: String
var running = false
var done = false
init(_ task: String) {
self.task = task
}
override var isAsynchronous: Bool { true }
override var isExecuting: Bool {
get { running }
set {
willChangeValue(forKey: "isExecuting")
running = newValue
didChangeValue(forKey: "isExecuting")
}
}
override var isFinished: Bool {
get { done }
set {
willChangeValue(forKey: "isFinished")
done = newValue
didChangeValue(forKey: "isFinished")
}
}
override func main() {
DispatchQueue.global(qos: .background).async {
self.isExecuting = true
for i in 0...10 {
print("\(self.task) Executing ..", i)
sleep(1)
}
print("Done")
self.isExecuting = false
self.isFinished = true
}
}
override func start() {
print("\(task) starting")
main()
}
}
class Execution {
// Managing tasks with OperationQueue
func executeTaskWithOperation() {
let t1 = AsyncOp("task1")
let t2 = AsyncOp("task2")
let successBlock = BlockOperation {
print("Tasks Completed")
}
let oper = OperationQueue()
t2.addDependency(t1)
successBlock.addDependency(t2)
successBlock.addDependency(t1)
oper.addOperations([t1, t2, successBlock], waitUntilFinished: true)
}
}
let e = Execution()
e.executeTaskWithOperation()
在
执行从 OperationQueue
更改为 DispatchGroup
和 DispatchSemaphore
。
DispatchGroup : 它确保两个任务都已完成,然后调用 notify
块。
DispatchSemaphore : 它使用 wait 命令保存异步资源,直到我们不会发送 signal 命令,即我们对信号量说要等到 task1 未完成。
任务示例代码。
class Execution {
// Managing tasks with DispatchGroup
func executeTaskWithGroup() {
let groups = DispatchGroup()
let semaphore = DispatchSemaphore(value: 1)
groups.enter()
semaphore.wait()
TaskManager.shared.task1Call {
groups.leave()
semaphore.signal()
}
groups.enter()
TaskManager.shared.task2Call {
groups.leave()
}
groups.notify(queue: DispatchQueue.global(qos: .background)) {
print("Tasks Completed")
}
}
}
要执行命令,我们需要做的就是。
let e = Execution()
e.executeTaskWithGroup()
但是上面的代码在主线程中执行并阻塞了UI。为了防止这种情况,您需要在后台队列中调用上面的代码,如下所示。
let queue = DispatchQueue.init(label: "MyQueue", qos: .background, attributes: .concurrent, autoreleaseFrequency: .workItem, target: nil)
queue.async {
let e = Execution()
e.executeTaskWithGroup()
}
现在一切正常,符合我的需要。
插件
以防万一,如果有人要求在上述情况下调用多个 API,则将您的任务异步添加到队列中。
let queue = DispatchQueue.init(label: "MyQueue", qos: .background, attributes: .concurrent, autoreleaseFrequency: .workItem, target: nil)
queue.async {
let e1 = Execution()
e1.executeTaskWithGroup()
}
queue.async {
let e2 = Execution()
e2.executeTaskWithGroup()
}
现在 e1 和 e2 可以并行执行,不会阻塞主线程。
参考资料:
Dexecutor 在这里救援
免责声明:我是 Dexecutor 的所有者
Dexecutor 可以很容易地用于 workflow 等用例