具有依赖性的异步任务执行

Async tasks execution with dependency

情况:

我有 2 个任务在异步后台模式下说 T1T2T2 依赖于 T1 并且有 successBlock 在两个任务完成后执行 T1 & T2.

为了更好地理解,下面是快速图表。

编辑:

为了更好地理解任务,您可以假设 T1 和 T2 是 API 调用,它们总是在异步模式下执行。我需要一些来自 T1 的输出数据来命中 T2 API。完成这两项任务后,我需要更新 UI.


为了完成这个场景,我在 T1 中添加了我的第一个异步工作,在 T2 中添加了第二个工作,并且依赖于 T2T1 和 successblock 对这两个任务都有依赖性。

代码工作

  1. 我的任务

    class TaskManager {
    
        static let shared = TaskManager()
    
        func task1Call(complete: @escaping ()->()) {
            DispatchQueue.global(qos: .background).async {
                for i in 0...10 {
                    print("~~> Task 1 Executing ..", i)
                    sleep(1)
                }
                complete()
            }
        }
    
        func task2Call(complete: @escaping ()->()) {
            DispatchQueue.global(qos: .background).async {
                for i in 0...10 {
                    print("==> Task 2 Executing ..", i)
                    sleep(1)
                }
                complete()
            }
        }
    }
    
  2. 执行任务

    class Execution {
    
        // Managing tasks with OperationQueue
        func executeTaskWithOperation()  {
    
            let t1 = BlockOperation {
                TaskManager.shared.task1Call {
                    print("Task 1 Completed")
                }
            }
    
            let t2 = BlockOperation {
                TaskManager.shared.task2Call {
                    print("Task 2 Completed")
                }
            }
    
            let successBlock = BlockOperation {
                print("Tasks Completed")
            }
    
            let oper = OperationQueue()
    
            t2.addDependency(t1)
            successBlock.addDependency(t2)
            successBlock.addDependency(t1)
    
            oper.addOperations([t1, t2, successBlock], waitUntilFinished: true)
    
        }
    }
    
    let e = Execution()
    e.executeTaskWithOperation()
    

问题:

两个任务并行执行,successBlock 在任务 1 和任务 2 完成之前执行。

控制台输出:

==> Task 2 Executing .. 0
Tasks Completed
~~> Task 1 Executing .. 0
~~> Task 1 Executing .. 1
==> Task 2 Executing .. 1
==> Task 2 Executing .. 2
~~> Task 1 Executing .. 2
==> Task 2 Executing .. 3
~~> Task 1 Executing .. 3
==> Task 2 Executing .. 4
~~> Task 1 Executing .. 4
==> Task 2 Executing .. 5
~~> Task 1 Executing .. 5
==> Task 2 Executing .. 6
~~> Task 1 Executing .. 6
==> Task 2 Executing .. 7
~~> Task 1 Executing .. 7
==> Task 2 Executing .. 8
~~> Task 1 Executing .. 8
==> Task 2 Executing .. 9
~~> Task 1 Executing .. 9
~~> Task 1 Executing .. 10
==> Task 2 Executing .. 10
Task 1 Completed
Task 2 Completed

我无法弄清楚我做错了什么,当我使用同步模式而不是异步模式时,即使是相同的代码也能正常工作。

您的 t1t2 是产生后台线程的块操作(每个都执行一些打印然后退出,但这并不重要)。一旦它们完成产卵,它们就被认为已经完成。 successBlock 取决于生成的两个后台线程,然后就完成了。您想要 BlockOperation 本身的工作:

class Execution {

    // Managing tasks with OperationQueue
    func executeTaskWithOperation()  {

      let t1 = BlockOperation {
          for i in 0...10 {
              print("~~> Task 1 Executing ..", i)
              sleep(1)
          }
          print("Task 1 completed")
        }

        let t2 = BlockOperation {
          for i in 0...10 {
            print("==> Task 2 Executing ..", i)
            sleep(1)
          }
         print("Task 2 Completed")
        }

        let successBlock = BlockOperation {
            print("Tasks Completed")
        }

        let oper = OperationQueue()

        t2.addDependency(t1)  // Remove this to see concurrent exec of t1 and t2
        successBlock.addDependency(t2)
        successBlock.addDependency(t1)

        oper.addOperations([t1, t2, successBlock], waitUntilFinished: true)

    }
}

let e = Execution()
e.executeTaskWithOperation()

编辑:为了在后台线程上执行,覆盖 Operation

class AsyncOp: Operation {
  let task: String
  var running = false
  var done = false

  init(_ task: String) {
    self.task = task
  }

  override var isAsynchronous: Bool { true }
  override var isExecuting: Bool {
    get { running }
    set {
      willChangeValue(forKey: "isExecuting")
      running = newValue
      didChangeValue(forKey: "isExecuting")
    }
  }
  override var isFinished: Bool {
    get { done }
    set {
      willChangeValue(forKey: "isFinished")
      done = newValue
      didChangeValue(forKey: "isFinished")
    }
  }

  override func main() {
    DispatchQueue.global(qos: .background).async {
      self.isExecuting = true
      for i in 0...10 {
        print("\(self.task) Executing ..", i)
        sleep(1)
      }
      print("Done")
      self.isExecuting = false
      self.isFinished = true
    }
  }

  override func start() {
    print("\(task) starting")
    main()
  }
}

class Execution {
  // Managing tasks with OperationQueue
  func executeTaskWithOperation()  {
    let t1 = AsyncOp("task1")
    let t2 = AsyncOp("task2")
    let successBlock = BlockOperation {
      print("Tasks Completed")
    }

    let oper = OperationQueue()
    t2.addDependency(t1)
    successBlock.addDependency(t2)
    successBlock.addDependency(t1)
    oper.addOperations([t1, t2, successBlock], waitUntilFinished: true)
  }
}

let e = Execution()
e.executeTaskWithOperation()

之后,我可以得出答案。


执行从 OperationQueue 更改为 DispatchGroupDispatchSemaphore

DispatchGroup : 它确保两个任务都已完成,然后调用 notify 块。

DispatchSemaphore : 它使用 wait 命令保存异步资源,直到我们不会发送 signal 命令,即我们对信号量说要等到 task1 未完成。

任务示例代码。

class Execution {
    // Managing tasks with DispatchGroup

    func executeTaskWithGroup() {
        let groups = DispatchGroup()
        let semaphore = DispatchSemaphore(value: 1)
        groups.enter()
        semaphore.wait()
        TaskManager.shared.task1Call {
            groups.leave()
            semaphore.signal()
        }

        groups.enter()
        TaskManager.shared.task2Call {
            groups.leave()
        }

        groups.notify(queue: DispatchQueue.global(qos: .background)) {
            print("Tasks Completed")
        }

    }

}

要执行命令,我们需要做的就是。

let e = Execution()
e.executeTaskWithGroup()

但是上面的代码在主线程中执行并阻塞了UI。为了防止这种情况,您需要在后台队列中调用上面的代码,如下所示。

let queue = DispatchQueue.init(label: "MyQueue", qos: .background, attributes: .concurrent, autoreleaseFrequency: .workItem, target: nil)

queue.async {
    let e = Execution()
    e.executeTaskWithGroup()
}

现在一切正常,符合我的需要。


插件

以防万一,如果有人要求在上述情况下调用多个 API,则将您的任务异步添加到队列中。

let queue = DispatchQueue.init(label: "MyQueue", qos: .background, attributes: .concurrent, autoreleaseFrequency: .workItem, target: nil)

queue.async {
    let e1 = Execution()
    e1.executeTaskWithGroup()
}

queue.async {
    let e2 = Execution()
    e2.executeTaskWithGroup()
}

现在 e1e2 可以并行执行,不会阻塞主线程。


参考资料:

Dexecutor 在这里救援

免责声明:我是 Dexecutor 的所有者

Dexecutor 可以很容易地用于 workflow 等用例

这里是sample Application