在 F# 中使用 FileHelperAsyncEngine
Using FileHelperAsyncEngine in F#
我正在尝试使用 FileHelpers 读取 csv,将行从 csv 文件加载到 f# 中的 Elasticsearch 数据库。一切都适用于小型测试文件,下面的代码片段一次读取所有记录
let readRows<'T>(filePath:string) =
let engine = FileHelperEngine(typeof<'T>)
engine.ReadFile(filePath)
|> Array.map (fun row -> row :?> 'T)
不幸的是,它需要能够读取更大的文件,其中许多列后来被逐行丢弃。函数 FileHelperAsyncEngine.BeginReadFile returns 一个 IDisposable.
let readRowsAsync<'T>(filePath:string) =
let engine = new FileHelperAsyncEngine(typeof<'T>)
engine.BeginReadFile(filePath:string)
|> ...
如何将此对象进一步处理为 <'T> 数组?
根据 the documentation,在调用 BeginReadFile
之后,engine
本身变成了一个可以迭代的可枚举序列(这是一个非常奇怪的设计决定)。所以你可以在它上面构建你自己的序列:
let readRowsAsync<'T>(filePath:string) =
seq {
let engine = new FileHelperAsyncEngine(typeof<'T>)
use disposable = engine.BeginReadFile(filePath)
for r in engine do
if not (shouldDiscard r) then yield (map r)
}
请注意,我使用的是 use
绑定,而不是 let
。这将确保在序列结束或消费者停止对其进行迭代后处置一次性物品。
请注意,以下将不工作,即使它会编译:
let readRowsAsync<'T>(filePath:string) =
let engine = new FileHelperAsyncEngine(typeof<'T>)
use disposable = engine.BeginReadFile(filePath)
engine |> Seq.filter (not << shouldDiscard) |> Seq.map map
如果你这样做,disposable 将在函数 returns 之后,但在结果枚举被迭代之前处理,从而在它的时间之前关闭文件。为确保正确处理一次性用品,您必须将整个内容包含在 seq
表达式中。
如果你真的想使用Seq.filter
/Seq.map
而不是for
/yield
,你仍然可以这样做,但是在seq
里面表达式,像这样:
let readRowsAsync<'T>(filePath:string) =
seq {
let engine = new FileHelperAsyncEngine(typeof<'T>)
use disposable = engine.BeginReadFile(filePath)
yield! engine |> Seq.filter (not << shouldDiscard) |> Seq.map map
}
您还可以将过滤和映射从 seq
表达式中取出(这将使您的函数更具可重用性),但 seq
表达式本身必须保留在原位,因为它控制处理部分:
let readRowsAsync<'T>(filePath:string) =
seq {
let engine = new FileHelperAsyncEngine(typeof<'T>)
use disposable = engine.BeginReadFile(filePath)
yield! engine
}
let results =
readRowsAsync<SomeType>( "someFile.txt" )
|> Seq.filter (not << shouldDiscard)
|> Seq.map map
最后,必须注意的是,你应该小心处理这个序列,因为它持有一个非托管资源(即打开的文件):不要长时间持有它,不要使用阻塞处理时的操作等
我正在尝试使用 FileHelpers 读取 csv,将行从 csv 文件加载到 f# 中的 Elasticsearch 数据库。一切都适用于小型测试文件,下面的代码片段一次读取所有记录
let readRows<'T>(filePath:string) =
let engine = FileHelperEngine(typeof<'T>)
engine.ReadFile(filePath)
|> Array.map (fun row -> row :?> 'T)
不幸的是,它需要能够读取更大的文件,其中许多列后来被逐行丢弃。函数 FileHelperAsyncEngine.BeginReadFile returns 一个 IDisposable.
let readRowsAsync<'T>(filePath:string) =
let engine = new FileHelperAsyncEngine(typeof<'T>)
engine.BeginReadFile(filePath:string)
|> ...
如何将此对象进一步处理为 <'T> 数组?
根据 the documentation,在调用 BeginReadFile
之后,engine
本身变成了一个可以迭代的可枚举序列(这是一个非常奇怪的设计决定)。所以你可以在它上面构建你自己的序列:
let readRowsAsync<'T>(filePath:string) =
seq {
let engine = new FileHelperAsyncEngine(typeof<'T>)
use disposable = engine.BeginReadFile(filePath)
for r in engine do
if not (shouldDiscard r) then yield (map r)
}
请注意,我使用的是 use
绑定,而不是 let
。这将确保在序列结束或消费者停止对其进行迭代后处置一次性物品。
请注意,以下将不工作,即使它会编译:
let readRowsAsync<'T>(filePath:string) =
let engine = new FileHelperAsyncEngine(typeof<'T>)
use disposable = engine.BeginReadFile(filePath)
engine |> Seq.filter (not << shouldDiscard) |> Seq.map map
如果你这样做,disposable 将在函数 returns 之后,但在结果枚举被迭代之前处理,从而在它的时间之前关闭文件。为确保正确处理一次性用品,您必须将整个内容包含在 seq
表达式中。
如果你真的想使用Seq.filter
/Seq.map
而不是for
/yield
,你仍然可以这样做,但是在seq
里面表达式,像这样:
let readRowsAsync<'T>(filePath:string) =
seq {
let engine = new FileHelperAsyncEngine(typeof<'T>)
use disposable = engine.BeginReadFile(filePath)
yield! engine |> Seq.filter (not << shouldDiscard) |> Seq.map map
}
您还可以将过滤和映射从 seq
表达式中取出(这将使您的函数更具可重用性),但 seq
表达式本身必须保留在原位,因为它控制处理部分:
let readRowsAsync<'T>(filePath:string) =
seq {
let engine = new FileHelperAsyncEngine(typeof<'T>)
use disposable = engine.BeginReadFile(filePath)
yield! engine
}
let results =
readRowsAsync<SomeType>( "someFile.txt" )
|> Seq.filter (not << shouldDiscard)
|> Seq.map map
最后,必须注意的是,你应该小心处理这个序列,因为它持有一个非托管资源(即打开的文件):不要长时间持有它,不要使用阻塞处理时的操作等