F# 异步:parent/child 取消?

F# async: parent/child cancellation?

所以我们开始:给定一个 Confluent.Kafka IConsumer<>,它会将其包装到专用的 async CE 中,并在未请求取消时消耗。这段代码还针对 OperationCancelledException 进行自我防御并运行 finally 块以确保消费者正常终止。

let private consumeUntiCancelled callback (consumer: IConsumer<'key, 'value>) =
    async {
            let! ct = Async.CancellationToken
            try
                try
                    while not ct.IsCancellationRequested do 
                        let consumeResult = consumer.Consume(ct)
                        if not consumeResult.IsPartitionEOF then do! (callback consumeResult) 
                with
                | :? OperationCanceledException -> return ()
            finally
                consumer.Close()
                consumer.Dispose()
    }

问题 #1: 此代码是否正确,还是我在滥用 async

到目前为止一切顺利。在我的应用程序中,我必须处理许多 必须 完全死亡的消费者。所以,假设 consumers: seq<Async<unit>> 代表它们,下面的代码就是我想出的:

async {
        for consumer in consumers do 
            do! (Async.StartChild consumer |> Async.Ignore).
    }

我希望此代码将子项链接到父项的取消上下文,并且一旦取消,子项也将被取消。

问题 #2: 即使 child 被取消,我的 finally block gua运行 是否会 运行?

我对您的代码有两个看法:

  • 您对 Async.StartChild 的使用是正确的 - 所有子计算都将继承相同的取消令牌,并且当主令牌被取消时它们都会被取消。

  • 可以在调用 consumer.Consume(ct) 之后和调用 callback 之前取消异步工作流。我不确定这对您的特定问题意味着什么,但如果它从队列中删除了一些数据,则数据可能会在处理之前丢失。如果这是一个问题,那么我认为您需要使 callback 成为非异步的,或者以不同的方式调用它。

  • 在您的 consumeUntilCancelled 函数中,您不需要明确检查 ct.IsCancellationRequested 是否为真。异步工作流在每个 do!let! 中自动执行此操作,因此您可以仅用 while 循环替换它。

这是一个最小的独立演示:

let consume s = async {
  try
    while true do 
      do! Async.Sleep 1000
      printfn "%s did work" s
  finally
    printfn "%s finalized" s }

let work = 
  async {
    for c in ["A"; "B"; "C"; "D"] do  
      do! Async.StartChild (consume c) |> Async.Ignore }

现在我们使用取消令牌创建计算:

// Run this in F# interactive
let ct = new System.Threading.CancellationTokenSource()
Async.Start(work, ct.Token)

// Run this sometime later
ct.Cancel()

一旦你调用了ct.Cancel,所有的finally块都会被调用,所有的循环都会停止。