如何在 F# 中构建带有一些预取的惰性序列?

how can I build a lazy sequence with some prefetch, in F#?

我有一个应用程序需要摄取大量数据(大约 25gb)。

以前测试的时候数据很小,都是在ram里面加载的,现在要改成stream了

它被分成几 MB 的块,块通过异步函数加载,该函数可以根据几个因素从磁盘或数据库中提取块。

一种方法是像这样提供块:

let blockSequence =
    seq {
        for id in blockIds do
            let data = loadDataAsync id |> Async.runSynchronously
            yield Some data
        yield None
    }

由于有时必须从数据库中提取数据,因此速度可能会很慢,我想添加一些 'prefetch' 以便在消费时间之前获取块数据。

我的一个想法是构建一个数据加载器列表:

let loaders =
    blockIds
    |> List.map (fun id -> async { loadDataAsync id })

但是我会处理异步类型和我的 return 类型。

另一个想法是将其包装在一个惰性块中:

let loaders =
    blockIds
    |> List.map (fun id -> lazy (loadDataAsync id))

所以我有一个统一的类型,但我需要 'poke' 将它们前面的元素拉出来,总的来说会很乱。

然后我在考虑一个队列,我可以在其中始终保持 x 元素在它们被消耗之前加载,但在另一个线程上处理加载。 这可以通过有一个线程来检查队列中存在多少元素,如果它低于阈值,加载下一个并将其入队。

类似于:

let queue = ConcurrentQueue<DataType>()
let wait  = EventWaitHandle (false)
some thread ->
    for id in blockIds do
        wait.WaitForOne()
        if queue.Length < x then
            queue.Add (load...)
        wait.Reset()
        

main thread ->

    let data = queue.TryDequeue....
    wait.Set()

但我不可能是第一个有此需求的人,所以有没有人提出好的解决方案?


编辑:

我想出了一个解决方案,但我卡在了消耗部分。

// trades buffer
let private prefetch = 10
let private tradesBuffer = BlockingQueueAgent<TradeData [] option>(prefetch)

// producing the data
let thread = Thread(ThreadStart(fun _ ->
            async {
                for b in timeBlocks do
                    let! data = loadFromCacheAsync (makeFilename instrument interval b)
                    do! tradesBuffer.AsyncAdd (Some data)
                do! tradesBuffer.AsyncAdd (None)
            }
            |> Async.RunSynchronously
        )
    )


// consuming it
PSEUDO CODE THAT CAN'T WORK
seq {
    let rec pullData () =
        match tradesBuffer.Get() with
        | Some data ->
            yield Some data
            pullData ()
            
        | None ->
            yield None
    pullData ()                
}

如何使拉动看起来像一个序列?

这可行:

    seq {
        let mutable keepDoingIt = true
        while keepDoingIt do
            let data = tradesBuffer.Get ()
            yield data
            if data.IsNone then keepDoingIt <- false
    }

但我试图避免可变的(主要是作为练习,因为它已经足够好了)

您可以使用递归来避免 while 循环:

let rec loop () =
    seq {
        let data = tradesBuffer.Get ()
        yield data
        if data.IsSome then
            yield! loop()
    }