如何在 F# 中构建带有一些预取的惰性序列?
how can I build a lazy sequence with some prefetch, in F#?
我有一个应用程序需要摄取大量数据(大约 25gb)。
以前测试的时候数据很小,都是在ram里面加载的,现在要改成stream了
它被分成几 MB 的块,块通过异步函数加载,该函数可以根据几个因素从磁盘或数据库中提取块。
一种方法是像这样提供块:
let blockSequence =
seq {
for id in blockIds do
let data = loadDataAsync id |> Async.runSynchronously
yield Some data
yield None
}
由于有时必须从数据库中提取数据,因此速度可能会很慢,我想添加一些 'prefetch' 以便在消费时间之前获取块数据。
我的一个想法是构建一个数据加载器列表:
let loaders =
blockIds
|> List.map (fun id -> async { loadDataAsync id })
但是我会处理异步类型和我的 return 类型。
另一个想法是将其包装在一个惰性块中:
let loaders =
blockIds
|> List.map (fun id -> lazy (loadDataAsync id))
所以我有一个统一的类型,但我需要 'poke' 将它们前面的元素拉出来,总的来说会很乱。
然后我在考虑一个队列,我可以在其中始终保持 x 元素在它们被消耗之前加载,但在另一个线程上处理加载。
这可以通过有一个线程来检查队列中存在多少元素,如果它低于阈值,加载下一个并将其入队。
类似于:
let queue = ConcurrentQueue<DataType>()
let wait = EventWaitHandle (false)
some thread ->
for id in blockIds do
wait.WaitForOne()
if queue.Length < x then
queue.Add (load...)
wait.Reset()
main thread ->
let data = queue.TryDequeue....
wait.Set()
但我不可能是第一个有此需求的人,所以有没有人提出好的解决方案?
编辑:
我想出了一个解决方案,但我卡在了消耗部分。
// trades buffer
let private prefetch = 10
let private tradesBuffer = BlockingQueueAgent<TradeData [] option>(prefetch)
// producing the data
let thread = Thread(ThreadStart(fun _ ->
async {
for b in timeBlocks do
let! data = loadFromCacheAsync (makeFilename instrument interval b)
do! tradesBuffer.AsyncAdd (Some data)
do! tradesBuffer.AsyncAdd (None)
}
|> Async.RunSynchronously
)
)
// consuming it
PSEUDO CODE THAT CAN'T WORK
seq {
let rec pullData () =
match tradesBuffer.Get() with
| Some data ->
yield Some data
pullData ()
| None ->
yield None
pullData ()
}
如何使拉动看起来像一个序列?
这可行:
seq {
let mutable keepDoingIt = true
while keepDoingIt do
let data = tradesBuffer.Get ()
yield data
if data.IsNone then keepDoingIt <- false
}
但我试图避免可变的(主要是作为练习,因为它已经足够好了)
您可以使用递归来避免 while
循环:
let rec loop () =
seq {
let data = tradesBuffer.Get ()
yield data
if data.IsSome then
yield! loop()
}
我有一个应用程序需要摄取大量数据(大约 25gb)。
以前测试的时候数据很小,都是在ram里面加载的,现在要改成stream了
它被分成几 MB 的块,块通过异步函数加载,该函数可以根据几个因素从磁盘或数据库中提取块。
一种方法是像这样提供块:
let blockSequence =
seq {
for id in blockIds do
let data = loadDataAsync id |> Async.runSynchronously
yield Some data
yield None
}
由于有时必须从数据库中提取数据,因此速度可能会很慢,我想添加一些 'prefetch' 以便在消费时间之前获取块数据。
我的一个想法是构建一个数据加载器列表:
let loaders =
blockIds
|> List.map (fun id -> async { loadDataAsync id })
但是我会处理异步类型和我的 return 类型。
另一个想法是将其包装在一个惰性块中:
let loaders =
blockIds
|> List.map (fun id -> lazy (loadDataAsync id))
所以我有一个统一的类型,但我需要 'poke' 将它们前面的元素拉出来,总的来说会很乱。
然后我在考虑一个队列,我可以在其中始终保持 x 元素在它们被消耗之前加载,但在另一个线程上处理加载。 这可以通过有一个线程来检查队列中存在多少元素,如果它低于阈值,加载下一个并将其入队。
类似于:
let queue = ConcurrentQueue<DataType>()
let wait = EventWaitHandle (false)
some thread ->
for id in blockIds do
wait.WaitForOne()
if queue.Length < x then
queue.Add (load...)
wait.Reset()
main thread ->
let data = queue.TryDequeue....
wait.Set()
但我不可能是第一个有此需求的人,所以有没有人提出好的解决方案?
编辑:
我想出了一个解决方案,但我卡在了消耗部分。
// trades buffer
let private prefetch = 10
let private tradesBuffer = BlockingQueueAgent<TradeData [] option>(prefetch)
// producing the data
let thread = Thread(ThreadStart(fun _ ->
async {
for b in timeBlocks do
let! data = loadFromCacheAsync (makeFilename instrument interval b)
do! tradesBuffer.AsyncAdd (Some data)
do! tradesBuffer.AsyncAdd (None)
}
|> Async.RunSynchronously
)
)
// consuming it
PSEUDO CODE THAT CAN'T WORK
seq {
let rec pullData () =
match tradesBuffer.Get() with
| Some data ->
yield Some data
pullData ()
| None ->
yield None
pullData ()
}
如何使拉动看起来像一个序列?
这可行:
seq {
let mutable keepDoingIt = true
while keepDoingIt do
let data = tradesBuffer.Get ()
yield data
if data.IsNone then keepDoingIt <- false
}
但我试图避免可变的(主要是作为练习,因为它已经足够好了)
您可以使用递归来避免 while
循环:
let rec loop () =
seq {
let data = tradesBuffer.Get ()
yield data
if data.IsSome then
yield! loop()
}