如何通过流式 ByteString 跟踪进度?
How to track progress through a streaming ByteString?
我正在使用 streaming-utils streaming-utils to stream a HTTP response body. I want to track the progress similar to how bytestring-progress allows with lazy ByteString
s. I suspect something like toChunks
would be necessary, then reducing some cumulative bytes read and returning the original stream unmodified. But I cannot figure it out, and the streaming 文档非常无用,大部分都是与替代库的宏伟比较。
这是我迄今为止尽最大努力编写的一些代码。它还不包括计数,只是尝试在块流过时打印块的大小(并且不编译)。
download :: ByteString -> FilePath -> IO ()
download i file = do
req <- parseRequest . C.unpack $ i
m <- newHttpClientManager
runResourceT $ do
resp <- http req m
lift . traceIO $ "downloading " <> file
let body = SBS.fromChunks $ mapsM step $ SBS.toChunks $ responseBody resp
SBS.writeFile file body
step bs = do
traceIO $ "got " <> show (C.length bs) <> " bytes"
return bs
我们想要的是通过两种方式遍历Stream (Of ByteString) IO ()
:
- 累积
ByteString
的传入长度并将更新打印到控制台。
- 将流写入文件的一个。
我们可以借助 copy
函数来做到这一点,该函数的类型为:
copy :: Monad m => Stream (Of a) m r -> Stream (Of a) (Stream (Of a) m) r
copy
获取一个流并将其复制到两个不同的单子层中,其中原始流的每个元素都由新分离流的两个层发出。
(请注意,我们正在更改基本 monad,而不是仿函数。将仿函数更改为另一个 Stream
所做的是在单个流中 delimit groups,我们不感兴趣就是这里。)
以下函数获取一个流,复制它,使用 S.scan
, prints them 和 returns 另一个您仍然可以使用的流来累积传入字符串的长度,例如将其写入文件:
{-# LANGUAGE OverloadedStrings #-}
import Streaming
import qualified Streaming.Prelude as S
import qualified Data.ByteString as B
track :: Stream (Of B.ByteString) IO r -> Stream (Of B.ByteString) IO r
track stream =
S.mapM_ (liftIO . print) -- brings us back to the base monad, here another stream
. S.scan (\s b -> s + B.length b) (0::Int) id
$ S.copy stream
这将打印 ByteString
s 以及累计长度:
main :: IO ()
main = S.mapM_ B.putStr . track $ S.each ["aa","bb","c"]
我正在使用 streaming-utils streaming-utils to stream a HTTP response body. I want to track the progress similar to how bytestring-progress allows with lazy ByteString
s. I suspect something like toChunks
would be necessary, then reducing some cumulative bytes read and returning the original stream unmodified. But I cannot figure it out, and the streaming 文档非常无用,大部分都是与替代库的宏伟比较。
这是我迄今为止尽最大努力编写的一些代码。它还不包括计数,只是尝试在块流过时打印块的大小(并且不编译)。
download :: ByteString -> FilePath -> IO ()
download i file = do
req <- parseRequest . C.unpack $ i
m <- newHttpClientManager
runResourceT $ do
resp <- http req m
lift . traceIO $ "downloading " <> file
let body = SBS.fromChunks $ mapsM step $ SBS.toChunks $ responseBody resp
SBS.writeFile file body
step bs = do
traceIO $ "got " <> show (C.length bs) <> " bytes"
return bs
我们想要的是通过两种方式遍历Stream (Of ByteString) IO ()
:
- 累积
ByteString
的传入长度并将更新打印到控制台。 - 将流写入文件的一个。
我们可以借助 copy
函数来做到这一点,该函数的类型为:
copy :: Monad m => Stream (Of a) m r -> Stream (Of a) (Stream (Of a) m) r
copy
获取一个流并将其复制到两个不同的单子层中,其中原始流的每个元素都由新分离流的两个层发出。
(请注意,我们正在更改基本 monad,而不是仿函数。将仿函数更改为另一个 Stream
所做的是在单个流中 delimit groups,我们不感兴趣就是这里。)
以下函数获取一个流,复制它,使用 S.scan
, prints them 和 returns 另一个您仍然可以使用的流来累积传入字符串的长度,例如将其写入文件:
{-# LANGUAGE OverloadedStrings #-}
import Streaming
import qualified Streaming.Prelude as S
import qualified Data.ByteString as B
track :: Stream (Of B.ByteString) IO r -> Stream (Of B.ByteString) IO r
track stream =
S.mapM_ (liftIO . print) -- brings us back to the base monad, here another stream
. S.scan (\s b -> s + B.length b) (0::Int) id
$ S.copy stream
这将打印 ByteString
s 以及累计长度:
main :: IO ()
main = S.mapM_ B.putStr . track $ S.each ["aa","bb","c"]