Haskell 中字节流的高效流式传输和操作
Efficient streaming and manipulation of a byte stream in Haskell
在为大型 (<bloblength><blob>)*
编码二进制文件编写反序列化程序时,我遇到了各种 Haskell 生产-转换-消费库。到目前为止,我知道有四个流媒体库:
- Data.Conduit:使用广泛,资源管理非常细致
- Pipes: Similar to
conduit
(Haskell Cast #6 很好地揭示了 conduit
和 pipes
) 之间的区别
- Data.Binary.Get: 提供getWord32be等有用的函数,但是流式的例子很尴尬
- System.IO.Streams: 好像是最容易用的
这是一个精简示例,说明当我尝试使用 conduit
进行 Word32
流式传输时出现问题的地方。一个稍微更实际的例子是首先读取一个 Word32
来确定 blob 的长度,然后产生一个惰性的 ByteString
该长度(然后进一步反序列化)。
但在这里我只是尝试从二进制文件中以流方式提取 Word32:
module Main where
-- build-depends: bytestring, conduit, conduit-extra, resourcet, binary
import Control.Monad.Trans.Resource (MonadResource, runResourceT)
import qualified Data.Binary.Get as G
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as C
import qualified Data.ByteString.Lazy as BL
import Data.Conduit
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.List as CL
import Data.Word (Word32)
import System.Environment (getArgs)
-- gets a Word32 from a ByteString.
getWord32 :: C.ByteString -> Word32
getWord32 bs = do
G.runGet G.getWord32be $ BL.fromStrict bs
-- should read BytesString and return Word32
transform :: (Monad m, MonadResource m) => Conduit BS.ByteString m Word32
transform = do
mbs <- await
case mbs of
Just bs -> do
case C.null bs of
False -> do
yield $ getWord32 bs
leftover $ BS.drop 4 bs
transform
True -> return ()
Nothing -> return ()
main :: IO ()
main = do
filename <- fmap (!!0) getArgs -- should check length getArgs
result <- runResourceT $ (CB.sourceFile filename) $$ transform =$ CL.consume
print $ length result -- is always 8188 for files larger than 32752 bytes
程序的输出只是读取的 Word32 的数量。结果流在读取第一个块(大约 32KiB)后终止。出于某种原因,mbs
永远不会是 Nothing
,所以我必须检查 null bs
,它会在块被消耗时停止流。显然,我的管道 transform
有问题。我看到了两条解决方案:
await
不想去ByteStream
的第二个chunk,所以有没有别的函数拉下一个chunk?在我见过的示例中(例如 Conduit 101),这不是完成的方式
- 这只是错误的设置方式
transform
。
这是如何正确完成的?这是正确的方法吗? (性能很重要。)
更新:这是一个糟糕的方法,使用Systems.IO.Streams
:
module Main where
import Data.Word (Word32)
import System.Environment (getArgs)
import System.IO (IOMode (ReadMode), openFile)
import qualified System.IO.Streams as S
import System.IO.Streams.Binary (binaryInputStream)
import System.IO.Streams.List (outputToList)
main :: IO ()
main = do
filename : _ <- getArgs
h <- openFile filename ReadMode
s <- S.handleToInputStream h
i <- binaryInputStream s :: IO (S.InputStream Word32)
r <- outputToList $ S.connect i
print $ last r
'Bad'表示:对时间要求很高space,不处理解码异常
您的直接问题是由您的使用方式引起的 leftover
。该函数用于 "Provide a single piece of leftover input to be consumed by the next component in the current monadic binding",因此当您在使用 transform
循环之前给它 bs
时,您实际上是在丢弃剩余的字节串(即 bs
之后的内容) ).
基于您的代码的正确解决方案将使用 the incremental input interface of Data.Binary.Get
to replace your yield
/leftover
combination with something that consumes each chunk fully. A more pragmatic approach, though, is using the binary-conduit package, which provides that in the shape of conduitGet
(its source 可以很好地了解 "manual" 实现的样子):
import Data.Conduit.Serialization.Binary
-- etc.
transform :: (Monad m, MonadResource m) => Conduit BS.ByteString m Word32
transform = conduitGet G.getWord32be
需要注意的是,如果字节总数不是 4 的倍数(即最后一个 Word32
不完整),这将引发解析错误。在不太可能的情况下,这不是你想要的,一个懒惰的出路就是简单地在输入字节串上使用 \bs -> C.take (4 * truncate (C.length bs / 4)) bs
。
使用 pipes
(以及 pipes-group
和 pipes-bytestring
),演示问题简化为组合器。首先,我们将传入的未区分字节流解析为 4 字节的小块:
chunksOfStrict :: (Monad m) => Int -> Producer ByteString m r -> Producer ByteString m r
chunksOfStrict n = folds mappend mempty id . view (Bytes.chunksOf n)
然后我们将它们映射到 Word32
s 并(在这里)计算它们。
main :: IO ()
main = do
filename:_ <- getArgs
IO.withFile filename IO.ReadMode $ \h -> do
n <- P.length $ chunksOfStrict 4 (Bytes.fromHandle h) >-> P.map getWord32
print n
如果我们少于 4 个字节或无法解析,这将失败,但我们也可以使用
进行映射
getMaybeWord32 :: ByteString -> Maybe Word32
getMaybeWord32 bs = case G.runGetOrFail G.getWord32be $ BL.fromStrict bs of
Left r -> Nothing
Right (_, off, w32) -> Just w32
接下来的程序将打印有效 4 字节序列的解析
main :: IO ()
main = do
filename:_ <- getArgs
IO.withFile filename IO.ReadMode $ \h -> do
runEffect $ chunksOfStrict 4 (Bytes.fromHandle h)
>-> P.map getMaybeWord32
>-> P.concat -- here `concat` eliminates maybes
>-> P.print
当然,还有其他处理失败解析的方法。
不过,这里有一些更接近您要求的程序。它从字节流 (Producer ByteString m r
) 中取出一个四字节的段,如果足够长,则将其读取为 Word32
;然后它需要 那么多 的传入字节并将它们累积到一个惰性字节串中,产生它。它只是重复这个直到它用完字节。在下面的 main
中,我打印了每个生成的惰性字节串:
module Main (main) where
import Pipes
import qualified Pipes.Prelude as P
import Pipes.Group (folds)
import qualified Pipes.ByteString as Bytes ( splitAt, fromHandle, chunksOf )
import Control.Lens ( view ) -- or Lens.Simple (view) -- or Lens.Micro ((.^))
import qualified System.IO as IO ( IOMode(ReadMode), withFile )
import qualified Data.Binary.Get as G ( runGet, getWord32be )
import Data.ByteString ( ByteString )
import qualified Data.ByteString.Lazy.Char8 as BL
import System.Environment ( getArgs )
splitLazy :: (Monad m, Integral n) =>
n -> Producer ByteString m r -> m (BL.ByteString, Producer ByteString m r)
splitLazy n bs = do
(bss, rest) <- P.toListM' $ view (Bytes.splitAt n) bs
return (BL.fromChunks bss, rest)
measureChunks :: Monad m => Producer ByteString m r -> Producer BL.ByteString m r
measureChunks bs = do
(lbs, rest) <- lift $ splitLazy 4 bs
if BL.length lbs /= 4
then rest >-> P.drain -- in fact it will be empty
else do
let w32 = G.runGet G.getWord32be lbs
(lbs', rest') <- lift $ splitLazy w32 bs
yield lbs
measureChunks rest
main :: IO ()
main = do
filename:_ <- getArgs
IO.withFile filename IO.ReadMode $ \h -> do
runEffect $ measureChunks (Bytes.fromHandle h) >-> P.print
这又很粗糙,因为它使用 runGet
而不是 runGetOrFail
,但这很容易修复。管道标准程序是在失败的解析和 return 未解析的字节流上停止流转换。
如果你预期 Word32s
是大数,所以你不想将相应的字节流累积为惰性字节串,而是说将它们写入不同的文件而不累积,我们可以很容易地改变程序来做到这一点。这将需要复杂地使用管道,但这是 pipes
和 streaming
.
的首选方法
这里有一个我想投入使用的相对简单的解决方案。它重复使用 splitAt
包装到 State
monad 中,提供与 Data.Binary.Get
(的子集)相同的接口。结果 [ByteString]
是在 main
中获得的,在 getBlob
上有一个 whileJust
。
module Main (main) where
import Control.Monad.Loops
import Control.Monad.State
import qualified Data.Binary.Get as G (getWord32be, runGet)
import qualified Data.ByteString.Lazy as BL
import Data.Int (Int64)
import Data.Word (Word32)
import System.Environment (getArgs)
-- this is going to mimic the Data.Binary.Get.Get Monad
type Get = State BL.ByteString
getWord32be :: Get (Maybe Word32)
getWord32be = state $ \bs -> do
let (w, rest) = BL.splitAt 4 bs
case BL.length w of
4 -> (Just w', rest) where
w' = G.runGet G.getWord32be w
_ -> (Nothing, BL.empty)
getLazyByteString :: Int64 -> Get BL.ByteString
getLazyByteString n = state $ \bs -> BL.splitAt n bs
getBlob :: Get (Maybe BL.ByteString)
getBlob = do
ml <- getWord32be
case ml of
Nothing -> return Nothing
Just l -> do
blob <- getLazyByteString (fromIntegral l :: Int64)
return $ Just blob
runGet :: Get a -> BL.ByteString -> a
runGet g bs = fst $ runState g bs
main :: IO ()
main = do
fname <- head <$> getArgs
bs <- BL.readFile fname
let ls = runGet loop bs where
loop = whileJust getBlob return
print $ length ls
getBlob
中没有错误处理,但很容易扩展。时间和 space 复杂度相当好,只要小心使用结果列表。 (创建一些随机数据供上面使用的 python 脚本是 here)。
在为大型 (<bloblength><blob>)*
编码二进制文件编写反序列化程序时,我遇到了各种 Haskell 生产-转换-消费库。到目前为止,我知道有四个流媒体库:
- Data.Conduit:使用广泛,资源管理非常细致
- Pipes: Similar to
conduit
(Haskell Cast #6 很好地揭示了conduit
和pipes
) 之间的区别
- Data.Binary.Get: 提供getWord32be等有用的函数,但是流式的例子很尴尬
- System.IO.Streams: 好像是最容易用的
这是一个精简示例,说明当我尝试使用 conduit
进行 Word32
流式传输时出现问题的地方。一个稍微更实际的例子是首先读取一个 Word32
来确定 blob 的长度,然后产生一个惰性的 ByteString
该长度(然后进一步反序列化)。
但在这里我只是尝试从二进制文件中以流方式提取 Word32:
module Main where
-- build-depends: bytestring, conduit, conduit-extra, resourcet, binary
import Control.Monad.Trans.Resource (MonadResource, runResourceT)
import qualified Data.Binary.Get as G
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as C
import qualified Data.ByteString.Lazy as BL
import Data.Conduit
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.List as CL
import Data.Word (Word32)
import System.Environment (getArgs)
-- gets a Word32 from a ByteString.
getWord32 :: C.ByteString -> Word32
getWord32 bs = do
G.runGet G.getWord32be $ BL.fromStrict bs
-- should read BytesString and return Word32
transform :: (Monad m, MonadResource m) => Conduit BS.ByteString m Word32
transform = do
mbs <- await
case mbs of
Just bs -> do
case C.null bs of
False -> do
yield $ getWord32 bs
leftover $ BS.drop 4 bs
transform
True -> return ()
Nothing -> return ()
main :: IO ()
main = do
filename <- fmap (!!0) getArgs -- should check length getArgs
result <- runResourceT $ (CB.sourceFile filename) $$ transform =$ CL.consume
print $ length result -- is always 8188 for files larger than 32752 bytes
程序的输出只是读取的 Word32 的数量。结果流在读取第一个块(大约 32KiB)后终止。出于某种原因,mbs
永远不会是 Nothing
,所以我必须检查 null bs
,它会在块被消耗时停止流。显然,我的管道 transform
有问题。我看到了两条解决方案:
await
不想去ByteStream
的第二个chunk,所以有没有别的函数拉下一个chunk?在我见过的示例中(例如 Conduit 101),这不是完成的方式- 这只是错误的设置方式
transform
。
这是如何正确完成的?这是正确的方法吗? (性能很重要。)
更新:这是一个糟糕的方法,使用Systems.IO.Streams
:
module Main where
import Data.Word (Word32)
import System.Environment (getArgs)
import System.IO (IOMode (ReadMode), openFile)
import qualified System.IO.Streams as S
import System.IO.Streams.Binary (binaryInputStream)
import System.IO.Streams.List (outputToList)
main :: IO ()
main = do
filename : _ <- getArgs
h <- openFile filename ReadMode
s <- S.handleToInputStream h
i <- binaryInputStream s :: IO (S.InputStream Word32)
r <- outputToList $ S.connect i
print $ last r
'Bad'表示:对时间要求很高space,不处理解码异常
您的直接问题是由您的使用方式引起的 leftover
。该函数用于 "Provide a single piece of leftover input to be consumed by the next component in the current monadic binding",因此当您在使用 transform
循环之前给它 bs
时,您实际上是在丢弃剩余的字节串(即 bs
之后的内容) ).
基于您的代码的正确解决方案将使用 the incremental input interface of Data.Binary.Get
to replace your yield
/leftover
combination with something that consumes each chunk fully. A more pragmatic approach, though, is using the binary-conduit package, which provides that in the shape of conduitGet
(its source 可以很好地了解 "manual" 实现的样子):
import Data.Conduit.Serialization.Binary
-- etc.
transform :: (Monad m, MonadResource m) => Conduit BS.ByteString m Word32
transform = conduitGet G.getWord32be
需要注意的是,如果字节总数不是 4 的倍数(即最后一个 Word32
不完整),这将引发解析错误。在不太可能的情况下,这不是你想要的,一个懒惰的出路就是简单地在输入字节串上使用 \bs -> C.take (4 * truncate (C.length bs / 4)) bs
。
使用 pipes
(以及 pipes-group
和 pipes-bytestring
),演示问题简化为组合器。首先,我们将传入的未区分字节流解析为 4 字节的小块:
chunksOfStrict :: (Monad m) => Int -> Producer ByteString m r -> Producer ByteString m r
chunksOfStrict n = folds mappend mempty id . view (Bytes.chunksOf n)
然后我们将它们映射到 Word32
s 并(在这里)计算它们。
main :: IO ()
main = do
filename:_ <- getArgs
IO.withFile filename IO.ReadMode $ \h -> do
n <- P.length $ chunksOfStrict 4 (Bytes.fromHandle h) >-> P.map getWord32
print n
如果我们少于 4 个字节或无法解析,这将失败,但我们也可以使用
进行映射getMaybeWord32 :: ByteString -> Maybe Word32
getMaybeWord32 bs = case G.runGetOrFail G.getWord32be $ BL.fromStrict bs of
Left r -> Nothing
Right (_, off, w32) -> Just w32
接下来的程序将打印有效 4 字节序列的解析
main :: IO ()
main = do
filename:_ <- getArgs
IO.withFile filename IO.ReadMode $ \h -> do
runEffect $ chunksOfStrict 4 (Bytes.fromHandle h)
>-> P.map getMaybeWord32
>-> P.concat -- here `concat` eliminates maybes
>-> P.print
当然,还有其他处理失败解析的方法。
不过,这里有一些更接近您要求的程序。它从字节流 (Producer ByteString m r
) 中取出一个四字节的段,如果足够长,则将其读取为 Word32
;然后它需要 那么多 的传入字节并将它们累积到一个惰性字节串中,产生它。它只是重复这个直到它用完字节。在下面的 main
中,我打印了每个生成的惰性字节串:
module Main (main) where
import Pipes
import qualified Pipes.Prelude as P
import Pipes.Group (folds)
import qualified Pipes.ByteString as Bytes ( splitAt, fromHandle, chunksOf )
import Control.Lens ( view ) -- or Lens.Simple (view) -- or Lens.Micro ((.^))
import qualified System.IO as IO ( IOMode(ReadMode), withFile )
import qualified Data.Binary.Get as G ( runGet, getWord32be )
import Data.ByteString ( ByteString )
import qualified Data.ByteString.Lazy.Char8 as BL
import System.Environment ( getArgs )
splitLazy :: (Monad m, Integral n) =>
n -> Producer ByteString m r -> m (BL.ByteString, Producer ByteString m r)
splitLazy n bs = do
(bss, rest) <- P.toListM' $ view (Bytes.splitAt n) bs
return (BL.fromChunks bss, rest)
measureChunks :: Monad m => Producer ByteString m r -> Producer BL.ByteString m r
measureChunks bs = do
(lbs, rest) <- lift $ splitLazy 4 bs
if BL.length lbs /= 4
then rest >-> P.drain -- in fact it will be empty
else do
let w32 = G.runGet G.getWord32be lbs
(lbs', rest') <- lift $ splitLazy w32 bs
yield lbs
measureChunks rest
main :: IO ()
main = do
filename:_ <- getArgs
IO.withFile filename IO.ReadMode $ \h -> do
runEffect $ measureChunks (Bytes.fromHandle h) >-> P.print
这又很粗糙,因为它使用 runGet
而不是 runGetOrFail
,但这很容易修复。管道标准程序是在失败的解析和 return 未解析的字节流上停止流转换。
如果你预期 Word32s
是大数,所以你不想将相应的字节流累积为惰性字节串,而是说将它们写入不同的文件而不累积,我们可以很容易地改变程序来做到这一点。这将需要复杂地使用管道,但这是 pipes
和 streaming
.
这里有一个我想投入使用的相对简单的解决方案。它重复使用 splitAt
包装到 State
monad 中,提供与 Data.Binary.Get
(的子集)相同的接口。结果 [ByteString]
是在 main
中获得的,在 getBlob
上有一个 whileJust
。
module Main (main) where
import Control.Monad.Loops
import Control.Monad.State
import qualified Data.Binary.Get as G (getWord32be, runGet)
import qualified Data.ByteString.Lazy as BL
import Data.Int (Int64)
import Data.Word (Word32)
import System.Environment (getArgs)
-- this is going to mimic the Data.Binary.Get.Get Monad
type Get = State BL.ByteString
getWord32be :: Get (Maybe Word32)
getWord32be = state $ \bs -> do
let (w, rest) = BL.splitAt 4 bs
case BL.length w of
4 -> (Just w', rest) where
w' = G.runGet G.getWord32be w
_ -> (Nothing, BL.empty)
getLazyByteString :: Int64 -> Get BL.ByteString
getLazyByteString n = state $ \bs -> BL.splitAt n bs
getBlob :: Get (Maybe BL.ByteString)
getBlob = do
ml <- getWord32be
case ml of
Nothing -> return Nothing
Just l -> do
blob <- getLazyByteString (fromIntegral l :: Int64)
return $ Just blob
runGet :: Get a -> BL.ByteString -> a
runGet g bs = fst $ runState g bs
main :: IO ()
main = do
fname <- head <$> getArgs
bs <- BL.readFile fname
let ls = runGet loop bs where
loop = whileJust getBlob return
print $ length ls
getBlob
中没有错误处理,但很容易扩展。时间和 space 复杂度相当好,只要小心使用结果列表。 (创建一些随机数据供上面使用的 python 脚本是 here)。