在 F# 中异步操作来自 streamReader 的数据
Asynchronously manipulating data from streamReader in F#
在Read large txt file multithreaded?这一行,我怀疑它是否相当于将Seq的切片块传递给每个线程以及它是否会安全地处理并行性; StreamReader 是线程安全的吗?
这是我用来测试这个的代码(欢迎对所用模式提出任何建议或批评:))
nthreads = 4
let Data = seq {
use sr = new System.IO.StreamReader (filePath)
while not sr.EndOfStream do
yield sr.ReadLine ()
}
let length = (Data |> Seq.length)
let packSize = length / nthreads
let groups =
[ for i in 0..(nthreads - 1) -> if i < nthreads - 1 then Data |> Seq.skip( packSize * i )
|> Seq.take( packSize )
else Data |> Seq.skip( packSize * i ) ]
let f = some_complex_function_modifiying_data
seq{ for a in groups -> f a }
|> Async.Parallel
|> Async.RunSynchronously
您的 Data
值的类型为 seq<string>
,这意味着它是惰性的。这意味着当您执行一些访问它的计算时,惰性序列将创建一个新的 StreamReader
实例并独立于其他计算读取数据。
当您向 seq { .. }
块添加一些打印时,您可以很容易地看到这一点:
let Data = seq {
printfn "reading"
use sr = new System.IO.StreamReader (filePath)
while not sr.EndOfStream do
yield sr.ReadLine () }
因此,您的并行处理实际上没有问题。它将为每个并行线程创建一个新的计算,因此永远不会共享 StreamReader
个实例。
另一个问题是这是否真的有用 - 从磁盘读取数据通常是一个瓶颈,因此在一个循环中执行操作可能会更快。即使这可行,使用 Seq.length
是获取长度的缓慢方法(因为它需要读取整个文件)并且对于 skip
也是如此。更好(但更复杂)的解决方案可能是使用流 Seek
.
在Read large txt file multithreaded?这一行,我怀疑它是否相当于将Seq的切片块传递给每个线程以及它是否会安全地处理并行性; StreamReader 是线程安全的吗?
这是我用来测试这个的代码(欢迎对所用模式提出任何建议或批评:))
nthreads = 4
let Data = seq {
use sr = new System.IO.StreamReader (filePath)
while not sr.EndOfStream do
yield sr.ReadLine ()
}
let length = (Data |> Seq.length)
let packSize = length / nthreads
let groups =
[ for i in 0..(nthreads - 1) -> if i < nthreads - 1 then Data |> Seq.skip( packSize * i )
|> Seq.take( packSize )
else Data |> Seq.skip( packSize * i ) ]
let f = some_complex_function_modifiying_data
seq{ for a in groups -> f a }
|> Async.Parallel
|> Async.RunSynchronously
您的 Data
值的类型为 seq<string>
,这意味着它是惰性的。这意味着当您执行一些访问它的计算时,惰性序列将创建一个新的 StreamReader
实例并独立于其他计算读取数据。
当您向 seq { .. }
块添加一些打印时,您可以很容易地看到这一点:
let Data = seq {
printfn "reading"
use sr = new System.IO.StreamReader (filePath)
while not sr.EndOfStream do
yield sr.ReadLine () }
因此,您的并行处理实际上没有问题。它将为每个并行线程创建一个新的计算,因此永远不会共享 StreamReader
个实例。
另一个问题是这是否真的有用 - 从磁盘读取数据通常是一个瓶颈,因此在一个循环中执行操作可能会更快。即使这可行,使用 Seq.length
是获取长度的缓慢方法(因为它需要读取整个文件)并且对于 skip
也是如此。更好(但更复杂)的解决方案可能是使用流 Seek
.