如何通过流式 ByteString 跟踪进度?

How to track progress through a streaming ByteString?

我正在使用 streaming-utils streaming-utils to stream a HTTP response body. I want to track the progress similar to how bytestring-progress allows with lazy ByteStrings. I suspect something like toChunks would be necessary, then reducing some cumulative bytes read and returning the original stream unmodified. But I cannot figure it out, and the streaming 文档非常无用,大部分都是与替代库的宏伟比较。

这是我迄今为止尽最大努力编写的一些代码。它还不包括计数,只是尝试在块流过时打印块的大小(并且不编译)。

download :: ByteString -> FilePath -> IO ()
download i file = do
  req <- parseRequest . C.unpack $ i
  m <- newHttpClientManager
  runResourceT $ do
    resp <- http req m
    lift . traceIO $ "downloading " <> file
    let body = SBS.fromChunks $ mapsM step $ SBS.toChunks $ responseBody resp
    SBS.writeFile file body

step bs = do
  traceIO $ "got " <> show (C.length bs) <> " bytes"
  return bs

我们想要的是通过两种方式遍历Stream (Of ByteString) IO ()

  • 累积 ByteString 的传入长度并将更新打印到控制台。
  • 将流写入文件的一个。

我们可以借助 copy 函数来做到这一点,该函数的类型为:

copy :: Monad m => Stream (Of a) m r -> Stream (Of a) (Stream (Of a) m) r

copy 获取一个流并将其复制到两个不同的单子层中,其中原始流的每个元素都由新分离流的两个层发出。

(请注意,我们正在更改基本 monad,而不是仿函数。将仿函数更改为另一个 Stream 所做的是在单个流中 delimit groups,我们不感兴趣就是这里。)

以下函数获取一个流,复制它,使用 S.scan, prints them 和 returns 另一个您仍然可以使用的流来累积传入字符串的长度,例如将其写入文件:

{-# LANGUAGE OverloadedStrings #-}
import Streaming
import qualified Streaming.Prelude as S
import qualified Data.ByteString as B

track :: Stream (Of B.ByteString) IO r -> Stream (Of B.ByteString) IO r
track stream =
      S.mapM_ (liftIO . print) -- brings us back to the base monad, here another stream
    . S.scan (\s b -> s + B.length b) (0::Int) id
    $ S.copy stream

这将打印 ByteStrings 以及累计长度:

main :: IO ()
main = S.mapM_ B.putStr . track $ S.each ["aa","bb","c"]