F# 异步:parent/child 取消?
F# async: parent/child cancellation?
所以我们开始:给定一个 Confluent.Kafka
IConsumer<>
,它会将其包装到专用的 async
CE 中,并在未请求取消时消耗。这段代码还针对 OperationCancelledException
进行自我防御并运行 finally
块以确保消费者正常终止。
let private consumeUntiCancelled callback (consumer: IConsumer<'key, 'value>) =
async {
let! ct = Async.CancellationToken
try
try
while not ct.IsCancellationRequested do
let consumeResult = consumer.Consume(ct)
if not consumeResult.IsPartitionEOF then do! (callback consumeResult)
with
| :? OperationCanceledException -> return ()
finally
consumer.Close()
consumer.Dispose()
}
问题 #1: 此代码是否正确,还是我在滥用 async
?
到目前为止一切顺利。在我的应用程序中,我必须处理许多 必须 完全死亡的消费者。所以,假设 consumers: seq<Async<unit>>
代表它们,下面的代码就是我想出的:
async {
for consumer in consumers do
do! (Async.StartChild consumer |> Async.Ignore).
}
我希望此代码将子项链接到父项的取消上下文,并且一旦取消,子项也将被取消。
问题 #2: 即使 child 被取消,我的 finally block gua运行 是否会 运行?
我对您的代码有两个看法:
您对 Async.StartChild
的使用是正确的 - 所有子计算都将继承相同的取消令牌,并且当主令牌被取消时它们都会被取消。
可以在调用 consumer.Consume(ct)
之后和调用 callback
之前取消异步工作流。我不确定这对您的特定问题意味着什么,但如果它从队列中删除了一些数据,则数据可能会在处理之前丢失。如果这是一个问题,那么我认为您需要使 callback
成为非异步的,或者以不同的方式调用它。
在您的 consumeUntilCancelled
函数中,您不需要明确检查 ct.IsCancellationRequested
是否为真。异步工作流在每个 do!
或 let!
中自动执行此操作,因此您可以仅用 while
循环替换它。
这是一个最小的独立演示:
let consume s = async {
try
while true do
do! Async.Sleep 1000
printfn "%s did work" s
finally
printfn "%s finalized" s }
let work =
async {
for c in ["A"; "B"; "C"; "D"] do
do! Async.StartChild (consume c) |> Async.Ignore }
现在我们使用取消令牌创建计算:
// Run this in F# interactive
let ct = new System.Threading.CancellationTokenSource()
Async.Start(work, ct.Token)
// Run this sometime later
ct.Cancel()
一旦你调用了ct.Cancel
,所有的finally
块都会被调用,所有的循环都会停止。
所以我们开始:给定一个 Confluent.Kafka
IConsumer<>
,它会将其包装到专用的 async
CE 中,并在未请求取消时消耗。这段代码还针对 OperationCancelledException
进行自我防御并运行 finally
块以确保消费者正常终止。
let private consumeUntiCancelled callback (consumer: IConsumer<'key, 'value>) =
async {
let! ct = Async.CancellationToken
try
try
while not ct.IsCancellationRequested do
let consumeResult = consumer.Consume(ct)
if not consumeResult.IsPartitionEOF then do! (callback consumeResult)
with
| :? OperationCanceledException -> return ()
finally
consumer.Close()
consumer.Dispose()
}
问题 #1: 此代码是否正确,还是我在滥用 async
?
到目前为止一切顺利。在我的应用程序中,我必须处理许多 必须 完全死亡的消费者。所以,假设 consumers: seq<Async<unit>>
代表它们,下面的代码就是我想出的:
async {
for consumer in consumers do
do! (Async.StartChild consumer |> Async.Ignore).
}
我希望此代码将子项链接到父项的取消上下文,并且一旦取消,子项也将被取消。
问题 #2: 即使 child 被取消,我的 finally block gua运行 是否会 运行?
我对您的代码有两个看法:
您对
Async.StartChild
的使用是正确的 - 所有子计算都将继承相同的取消令牌,并且当主令牌被取消时它们都会被取消。可以在调用
consumer.Consume(ct)
之后和调用callback
之前取消异步工作流。我不确定这对您的特定问题意味着什么,但如果它从队列中删除了一些数据,则数据可能会在处理之前丢失。如果这是一个问题,那么我认为您需要使callback
成为非异步的,或者以不同的方式调用它。在您的
consumeUntilCancelled
函数中,您不需要明确检查ct.IsCancellationRequested
是否为真。异步工作流在每个do!
或let!
中自动执行此操作,因此您可以仅用while
循环替换它。
这是一个最小的独立演示:
let consume s = async {
try
while true do
do! Async.Sleep 1000
printfn "%s did work" s
finally
printfn "%s finalized" s }
let work =
async {
for c in ["A"; "B"; "C"; "D"] do
do! Async.StartChild (consume c) |> Async.Ignore }
现在我们使用取消令牌创建计算:
// Run this in F# interactive
let ct = new System.Threading.CancellationTokenSource()
Async.Start(work, ct.Token)
// Run this sometime later
ct.Cancel()
一旦你调用了ct.Cancel
,所有的finally
块都会被调用,所有的循环都会停止。