流媒体库中的惯用预取
Idiomatic prefetching in a streaming library
我正在使用 streaming 库,但会接受使用管道或管道的答案。
说我有
import Streaming (Stream, Of)
import qualified Streaming.Prelude as S
streamChunks :: Int -> Stream (Of Thing) IO ()
streamChunks lastID = do
flip fix 0 $ \go thingID ->
unless (thingID > lastID) $ do
thing <- highLatencyGet thingID
S.yield thing
go (thingID+1)
为了减少延迟,我想分叉 highLatencyGet
来检索下一个 Thing
,同时在消费者中处理前一个 Thing
。
显然我可以转换我的函数,创建一个新的 MVar
并在调用 yield
之前分叉下一批,等等
但我想知道是否有一种惯用的(可组合的)方法可以做到这一点,这样它就可以打包在一个库中,并且可以用于任意 IO Stream
s。理想情况下,我们也可以配置预取值,例如:
prefetching :: Int -> Stream (Of a) IO () -> Stream (Of a) IO ()
此解决方案使用 管道 ,但它可以很容易地适应使用 流。准确地说,它需要 pipes、pipes-concurrency 和 async 包。
它不适用于 "direct" 风格。不是简单地转换 Producer
,它还需要一个消耗 Producer
的 "folding function"。这种连续传递样式对于设置和拆除并发机制是必需的。
import Pipes
import Pipes.Concurrent (spawn',bounded,fromInput,toOutput,atomically)
import Control.Concurrent.Async (Concurrently(..))
import Control.Exception (finally)
prefetching :: Int -> Producer a IO () -> (Producer a IO () -> IO r) -> IO r
prefetching bufsize source foldfunc = do
(outbox,inbox,seal) <- spawn' (bounded bufsize)
let cutcord effect = effect `finally` atomically seal
runConcurrently $
Concurrently (cutcord (runEffect (source >-> toOutput outbox)))
*>
Concurrently (cutcord (foldfunc (fromInput inbox)))
原始生产者的输出被重定向到有界队列。同时,我们将producer-folding函数应用到一个从队列中读取的producer。
每当每个并发操作完成时,我们都会注意及时关闭通道以避免让另一端挂起。
我正在使用 streaming 库,但会接受使用管道或管道的答案。
说我有
import Streaming (Stream, Of)
import qualified Streaming.Prelude as S
streamChunks :: Int -> Stream (Of Thing) IO ()
streamChunks lastID = do
flip fix 0 $ \go thingID ->
unless (thingID > lastID) $ do
thing <- highLatencyGet thingID
S.yield thing
go (thingID+1)
为了减少延迟,我想分叉 highLatencyGet
来检索下一个 Thing
,同时在消费者中处理前一个 Thing
。
显然我可以转换我的函数,创建一个新的 MVar
并在调用 yield
之前分叉下一批,等等
但我想知道是否有一种惯用的(可组合的)方法可以做到这一点,这样它就可以打包在一个库中,并且可以用于任意 IO Stream
s。理想情况下,我们也可以配置预取值,例如:
prefetching :: Int -> Stream (Of a) IO () -> Stream (Of a) IO ()
此解决方案使用 管道 ,但它可以很容易地适应使用 流。准确地说,它需要 pipes、pipes-concurrency 和 async 包。
它不适用于 "direct" 风格。不是简单地转换 Producer
,它还需要一个消耗 Producer
的 "folding function"。这种连续传递样式对于设置和拆除并发机制是必需的。
import Pipes
import Pipes.Concurrent (spawn',bounded,fromInput,toOutput,atomically)
import Control.Concurrent.Async (Concurrently(..))
import Control.Exception (finally)
prefetching :: Int -> Producer a IO () -> (Producer a IO () -> IO r) -> IO r
prefetching bufsize source foldfunc = do
(outbox,inbox,seal) <- spawn' (bounded bufsize)
let cutcord effect = effect `finally` atomically seal
runConcurrently $
Concurrently (cutcord (runEffect (source >-> toOutput outbox)))
*>
Concurrently (cutcord (foldfunc (fromInput inbox)))
原始生产者的输出被重定向到有界队列。同时,我们将producer-folding函数应用到一个从队列中读取的producer。
每当每个并发操作完成时,我们都会注意及时关闭通道以避免让另一端挂起。