流媒体库中的惯用预取

Idiomatic prefetching in a streaming library

我正在使用 streaming 库,但会接受使用管道或管道的答案。

说我有

import Streaming (Stream, Of)
import qualified Streaming.Prelude as S

streamChunks :: Int -> Stream (Of Thing) IO ()
streamChunks lastID = do
  flip fix 0 $ \go thingID ->
    unless (thingID > lastID) $ do
      thing <- highLatencyGet thingID
      S.yield thing
      go (thingID+1)

为了减少延迟,我想分叉 highLatencyGet 来检索下一个 Thing,同时在消费者中处理前一个 Thing

显然我可以转换我的函数,创建一个新的 MVar 并在调用 yield 之前分叉下一批,等等

但我想知道是否有一种惯用的(可组合的)方法可以做到这一点,这样它就可以打包在一个库中,并且可以用于任意 IO Streams。理想情况下,我们也可以配置预取值,例如:

prefetching :: Int -> Stream (Of a) IO () -> Stream (Of a) IO ()

此解决方案使用 管道 ,但它可以很容易地适应使用 。准确地说,它需要 pipespipes-concurrencyasync 包。

它不适用于 "direct" 风格。不是简单地转换 Producer,它还需要一个消耗 Producer 的 "folding function"。这种连续传递样式对于设置和拆除并发机制是必需的。

import Pipes
import Pipes.Concurrent (spawn',bounded,fromInput,toOutput,atomically)
import Control.Concurrent.Async (Concurrently(..))
import Control.Exception (finally)

prefetching :: Int -> Producer a IO () -> (Producer a IO () -> IO r) -> IO r
prefetching bufsize source foldfunc = do
    (outbox,inbox,seal) <- spawn' (bounded bufsize)
    let cutcord effect = effect `finally` atomically seal
    runConcurrently $
        Concurrently (cutcord (runEffect (source >-> toOutput outbox)))
        *>
        Concurrently (cutcord (foldfunc (fromInput inbox)))

原始生产者的输出被重定向到有界队列。同时,我们将producer-folding函数应用到一个从队列中读取的producer。

每当每个并发操作完成时,我们都会注意及时关闭通道以避免让另一端挂起。