在 F# 中使用 FileHelperAsyncEngine

Using FileHelperAsyncEngine in F#

我正在尝试使用 FileHelpers 读取 csv,将行从 csv 文件加载到 f# 中的 Elasticsearch 数据库。一切都适用于小型测试文件,下面的代码片段一次读取所有记录

let readRows<'T>(filePath:string) =
    let engine = FileHelperEngine(typeof<'T>)

    engine.ReadFile(filePath)
    |> Array.map (fun row -> row :?> 'T)

不幸的是,它需要能够读取更大的文件,其中许多列后来被逐行丢弃。函数 FileHelperAsyncEngine.BeginReadFile returns 一个 IDisposable.

let readRowsAsync<'T>(filePath:string) =
    let engine = new FileHelperAsyncEngine(typeof<'T>)

    engine.BeginReadFile(filePath:string)
    |> ...

如何将此对象进一步处理为 <'T> 数组?

根据 the documentation,在调用 BeginReadFile 之后,engine 本身变成了一个可以迭代的可枚举序列(这是一个非常奇怪的设计决定)。所以你可以在它上面构建你自己的序列:

let readRowsAsync<'T>(filePath:string) = 
  seq {
    let engine = new FileHelperAsyncEngine(typeof<'T>)
    use disposable = engine.BeginReadFile(filePath)

    for r in engine do
      if not (shouldDiscard r) then yield (map r)
  }

请注意,我使用的是 use 绑定,而不是 let。这将确保在序列结束或消费者停止对其进行迭代后处置一次性物品。

请注意,以下将工作,即使它会编译:

let readRowsAsync<'T>(filePath:string) = 
  let engine = new FileHelperAsyncEngine(typeof<'T>)
  use disposable = engine.BeginReadFile(filePath)

  engine |> Seq.filter (not << shouldDiscard) |> Seq.map map

如果你这样做,disposable 将在函数 returns 之后,但在结果枚举被迭代之前处理,从而在它的时间之前关闭文件。为确保正确处理一次性用品,您必须将整个内容包含在 seq 表达式中。

如果你真的想使用Seq.filter/Seq.map而不是for/yield,你仍然可以这样做,但是在seq里面表达式,像这样:

let readRowsAsync<'T>(filePath:string) = 
  seq {
    let engine = new FileHelperAsyncEngine(typeof<'T>)
    use disposable = engine.BeginReadFile(filePath)

    yield! engine |> Seq.filter (not << shouldDiscard) |> Seq.map map
  }

您还可以将过滤和映射从 seq 表达式中取出(这将使您的函数更具可重用性),但 seq 表达式本身必须保留在原位,因为它控制处理部分:

let readRowsAsync<'T>(filePath:string) = 
  seq {
    let engine = new FileHelperAsyncEngine(typeof<'T>)
    use disposable = engine.BeginReadFile(filePath)

    yield! engine
  }

let results = 
  readRowsAsync<SomeType>( "someFile.txt" )
  |> Seq.filter (not << shouldDiscard) 
  |> Seq.map map

最后,必须注意的是,你应该小心处理这个序列,因为它持有一个非托管资源(即打开的文件):不要长时间持有它,不要使用阻塞处理时的操作等