在 F# 中展平嵌套的异步序列

Flattening nested async sequences in F#

考虑一系列 F# 序列:

let seqOf123 = seq { for i in 1..3 do yield i }
let seqOf456 = seq { for i in 4..6 do yield i }
let seqOf789 = seq { for i in 7..9 do yield i }

...以及包含所有这些的序列:

let seqOfSeqOfNums = 
    seq {
        yield seqOf123
        yield seqOf456
        yield seqOf789
    }

现在我们有了一系列序列,我们可以使用内置的 Seq.concat 函数将其展平并包裹在 async 子句中以异步执行:

let firstAsyncSeqOfNums = async { return Seq.concat seqOfSeqOfNums }

我们有一个由 9 个数字组成的异步序列,带有签名 Async<seq<int>> 我们将返回。

现在考虑一系列异步序列:

let asyncSeqOf123 = async { return seqOf123 }
let asyncSeqOf456 = async { return seqOf456 }
let asyncSeqOf789 = async { return seqOf789 }

...以及包含它们的序列:

let seqOfAsyncSeqOfNums = 
    seq {
        yield asyncSeqOf123
        yield asyncSeqOf456
        yield asyncSeqOf789
    }

我们现在有一个 seq<Async<seq<int>>> 类型的序列。我们不能使用 Seq.concat 来展平这个,因为它是一系列异步序列。那么我们如何将其转换为所有整数数据都被展平的类型 Async<seq<int>> 呢?我们可以尝试做以下事情:

let secondAsyncSeqOfNums = 
    async {
        return seqOfAsyncSeqOfNums
        |> Seq.map (fun x -> x |> Async.RunSynchronously)
        |> Seq.concat
    }

看起来它完成了它的工作:它有一个类型 Async<seq<int>>,如果我们将它传递给 Async.RunSynchronously,它将产生相同的 9 个整数序列。但是它产生相同序列的方式并不等同于上面出现的 firstAsyncSeqOfNums。 secondAsyncSeqOfNums 的实现在生成单个扁平序列期间为每个嵌套的整数序列调用 Async.RunSynchronously。但这可以避免吗?请注意,我们正在生成一个异步扁平序列,理想情况下只需要一次调用 Async.RunSynchronously 来评估其内容。但是如果 Async.RunSynchronously 被多次调用,我找不到重写代码的方法。

您是否正在寻找这样的东西:

> let aMap f wf = async {                              
-    let! a = wf
-    return f a
-    };;

val aMap : f:('a -> 'b) -> wf:Async<'a> -> Async<'b>

> let aConcat wf = Async.Parallel wf |> aMap Seq.concat;;   

val aConcat : wf:seq<Async<#seq<'b>>> -> Async<seq<'b>>