F#异步序列计算卡住

F# asynchronous sequence calculation gets stuck

给定以下(简化的)F# 异步代码,对来自 asyncSeq 的项目进行分区

open FSharp.Control

let s : AsyncSeq<'t> = ...

let a =  
    s
    |> AsyncSeq.mapi (fun i a -> (i, a))
    |> AsyncSeq.groupBy (fun (i, _) -> int(i) % 3)
    |> AsyncSeq.map (fun (index, tWithIndex) -> (index + 1, tWithIndex |> AsyncSeq.map (fun (_, t) -> t)))
    |> AsyncSeq.toArraySynchronously
    |> Map.ofArray
    |> Map.map (fun i a -> a |> AsyncSeq.toArraySynchronously)

let b =  
    s
    |> AsyncSeq.mapi (fun i a -> (i, a))
    |> AsyncSeq.groupBy (fun (i, _) -> int(i) % 3)
    |> AsyncSeq.map (fun (index, tWithIndex) -> index + 1, AsyncSeq.toArraySynchronously tWithIndex |> Array.map (fun (_, t) -> t))
    |> AsyncSeq.toArraySynchronously
    |> Map.ofArray

我原以为 a 和 b 包含相同的 Map 值,但 b 计算从未完成,它卡住了。 b的表情有什么问题吗?

由于 AsyncSeq.groupBy 的工作方式,这有点出乎意料。它创建一个异步序列(组),每个序列都包含一个评估的键和组中另一个异步值序列:

AsyncSeq<'TKey * AsyncSeq<'TValue>>

问题是嵌套的 AsyncSeq<'TValue> 序列只有在组的外部异步序列被评估到最后时才会获得它们的所有值。如果您从嵌套的异步序列中请求一个值,它将阻塞直到计算外部序列。

我可以想象 groupBy 的实现,其中在嵌套异步序列中请求一个值实际上会恢复评估,但显然这不是当前的实现方式。

除了在 a 版本中执行的操作外,您还可以只使用 lazy 关键字来延迟 AsyncSeq.map:

中的计算
let b =  
  s
  |> AsyncSeq.mapi (fun i a -> (i, a))
  |> AsyncSeq.groupBy (fun (i, _) -> int(i) % 3)
  |> AsyncSeq.map (fun (index, tWithIndex) -> 
       index + 1, 
       lazy ( AsyncSeq.toArraySynchronously tWithIndex 
              |> Array.map (fun (_, t) -> t) ))
  |> AsyncSeq.toArraySynchronously
  |> Map.ofArray