使用流媒体库为每个块附加延迟?

Append a delay to each chunk with the streaming library?

流媒体新手和 Haskell 此处。

我一直在研究 streaming 库,我对理解块部分特别感兴趣。例如:

S.print $ S.delay 1.0 $ concats $ chunksOf 2 $ S.each [1..10]

或者:

S.print $ concats $ S.maps (S.delay 1.0) $ chunksOf 2 $ S.each [1..10]

这里我可以在每个元素之后引入一个延迟,但我想要的是在每个块之后有一个延迟,在这种情况下,每个第二个元素都有一个延迟。我试过这个但没有编译:

S.print $ concats $ S.delay 1.0 $ chunksOf 2 $ S.each [1..10]

我怎样才能做到这一点?

我们需要的是一个函数,它在块流的末尾插入一个延迟,并将该函数传递给 maps

delay doesn't work here because it put delays between each yielded value. But we can do it easily using functions from Applicative:

  S.print 
$ concats 
$ S.maps (\s -> s <* liftIO (threadDelay 1000000)) 
$ chunksOf 2 
$ S.each [1..10]

这里发生了什么? mapsStream 的 "base functor" 应用转换。在用 chunksOf 获得的 "chunked stream" 中,基函子是 本身 一个 Stream。此外,转换必须保留 Stream.

的 return 值

Streams 可以用 (>>=) :: Stream f m a -> (a -> Stream f m b) -> Stream f m b if the next stream depends on the final result of the previous one, or with functions like (<*) :: Stream f m a -> Stream f m b -> Stream f m a 之类的函数排序,如果没有的话。 (<*) 保留第一个 Stream 的 return 值,这就是我们在这种情况下想要的。

我们不想产生更多的元素,只是为了引入延迟效果,所以我们简单地将效果 liftIO 放入 Stream monad 中。


另一种在 Stream 的每个产生值之后插入延迟的方法是 zip it with an infinite list 延迟:

delay' :: MonadIO m => Int -> Stream (Of a) m r -> Stream (Of a) m r
delay' micros s = S.zipWith const s (S.repeatM (liftIO (threadDelay micros)))