在 F# 中异步操作来自 streamReader 的数据

Asynchronously manipulating data from streamReader in F#

Read large txt file multithreaded?这一行,我怀疑它是否相当于将Seq的切片块传递给每个线程以及它是否会安全地处理并行性; StreamReader 是线程安全的吗?

这是我用来测试这个的代码(欢迎对所用模式提出任何建议或批评:))

nthreads = 4    

let Data = seq {
        use sr = new System.IO.StreamReader (filePath)
        while not sr.EndOfStream do
            yield sr.ReadLine ()
        }

let length = (Data |> Seq.length)

let packSize = length / nthreads

let groups =
     [ for i in 0..(nthreads - 1) -> if i < nthreads - 1  then Data |> Seq.skip( packSize * i )
                                                                    |> Seq.take( packSize )
                                                          else Data |> Seq.skip( packSize * i ) ]

let f = some_complex_function_modifiying_data

seq{ for a in groups -> f a }
        |> Async.Parallel
        |> Async.RunSynchronously

您的 Data 值的类型为 seq<string>,这意味着它是惰性的。这意味着当您执行一些访问它的计算时,惰性序列将创建一个新的 StreamReader 实例并独立于其他计算读取数据。

当您向 seq { .. } 块添加一些打印时,您可以很容易地看到这一点:

let Data = seq {
    printfn "reading"
    use sr = new System.IO.StreamReader (filePath)
    while not sr.EndOfStream do
        yield sr.ReadLine ()  }

因此,您的并行处理实际上没有问题。它将为每个并行线程创建一个新的计算,因此永远不会共享 StreamReader 个实例。

另一个问题是这是否真的有用 - 从磁盘读取数据通常是一个瓶颈,因此在一个循环中执行操作可能会更快。即使这可行,使用 Seq.length 是获取长度的缓慢方法(因为它需要读取整个文件)并且对于 skip 也是如此。更好(但更复杂)的解决方案可能是使用流 Seek.