Haskell 中字节流的高效流式传输和操作

Efficient streaming and manipulation of a byte stream in Haskell

在为大型 (<bloblength><blob>)* 编码二进制文件编写反序列化程序时,我遇到了各种 Haskell 生产-转换-消费库。到目前为止,我知道有四个流媒体库:

这是一个精简示例,说明当我尝试使用 conduit 进行 Word32 流式传输时出现问题的地方。一个稍微更实际的例子是首先读取一个 Word32 来确定 blob 的长度,然后产生一个惰性的 ByteString 该长度(然后进一步反序列化)。 但在这里我只是尝试从二进制文件中以流方式提取 Word32:

module Main where

-- build-depends: bytestring, conduit, conduit-extra, resourcet, binary

import           Control.Monad.Trans.Resource (MonadResource, runResourceT)
import qualified Data.Binary.Get              as G
import qualified Data.ByteString              as BS
import qualified Data.ByteString.Char8        as C
import qualified Data.ByteString.Lazy         as BL
import           Data.Conduit
import qualified Data.Conduit.Binary          as CB
import qualified Data.Conduit.List            as CL
import           Data.Word                    (Word32)
import           System.Environment           (getArgs)

-- gets a Word32 from a ByteString.
getWord32 :: C.ByteString -> Word32
getWord32 bs = do
    G.runGet G.getWord32be $ BL.fromStrict bs

-- should read BytesString and return Word32
transform :: (Monad m, MonadResource m) => Conduit BS.ByteString m Word32
transform = do
    mbs <- await
    case mbs of
        Just bs -> do
            case C.null bs of
                False -> do
                    yield $ getWord32 bs
                    leftover $ BS.drop 4 bs
                    transform
                True -> return ()
        Nothing -> return ()

main :: IO ()
main = do
    filename <- fmap (!!0) getArgs  -- should check length getArgs
    result <- runResourceT $ (CB.sourceFile filename) $$ transform =$ CL.consume
    print $ length result   -- is always 8188 for files larger than 32752 bytes

程序的输出只是读取的 Word32 的数量。结果流在读取第一个块(大约 32KiB)后终止。出于某种原因,mbs 永远不会是 Nothing,所以我必须检查 null bs,它会在块被消耗时停止流。显然,我的管道 transform 有问题。我看到了两条解决方案:

  1. await不想去ByteStream的第二个chunk,所以有没有别的函数拉下一个chunk?在我见过的示例中(例如 Conduit 101),这不是完成的方式
  2. 这只是错误的设置方式transform

这是如何正确完成的?这是正确的方法吗? (性能很重要。)

更新:这是一个糟糕的方法,使用Systems.IO.Streams:

module Main where

import           Data.Word                (Word32)
import           System.Environment       (getArgs)
import           System.IO                (IOMode (ReadMode), openFile)
import qualified System.IO.Streams        as S
import           System.IO.Streams.Binary (binaryInputStream)
import           System.IO.Streams.List   (outputToList)

main :: IO ()
main = do
    filename : _ <- getArgs
    h <- openFile filename ReadMode
    s <- S.handleToInputStream h
    i <- binaryInputStream s :: IO (S.InputStream Word32)
    r <- outputToList $ S.connect i
    print $ last r

'Bad'表示:对时间要求很高space,不处理解码异常

您的直接问题是由您的使用方式引起的 leftover。该函数用于 "Provide a single piece of leftover input to be consumed by the next component in the current monadic binding",因此当您在使用 transform 循环之前给它 bs 时,您实际上是在丢弃剩余的字节串(即 bs 之后的内容) ).

基于您的代码的正确解决方案将使用 the incremental input interface of Data.Binary.Get to replace your yield/leftover combination with something that consumes each chunk fully. A more pragmatic approach, though, is using the binary-conduit package, which provides that in the shape of conduitGet (its source 可以很好地了解 "manual" 实现的样子):

import           Data.Conduit.Serialization.Binary

-- etc.

transform :: (Monad m, MonadResource m) => Conduit BS.ByteString m Word32
transform = conduitGet G.getWord32be

需要注意的是,如果字节总数不是 4 的倍数(即最后一个 Word32 不完整),这将引发解析错误。在不太可能的情况下,这不是你想要的,一个懒惰的出路就是简单地在输入字节串上使用 \bs -> C.take (4 * truncate (C.length bs / 4)) bs

使用 pipes(以及 pipes-grouppipes-bytestring),演示问题简化为组合器。首先,我们将传入的未区分字节流解析为 4 字节的小块:

chunksOfStrict :: (Monad m) => Int -> Producer ByteString m r -> Producer ByteString m r
chunksOfStrict n = folds mappend mempty id . view (Bytes.chunksOf n) 

然后我们将它们映射到 Word32s 并(在这里)计算它们。

main :: IO ()
main = do
   filename:_ <- getArgs
   IO.withFile filename IO.ReadMode $ \h -> do
     n <- P.length $ chunksOfStrict 4 (Bytes.fromHandle h) >-> P.map getWord32
     print n

如果我们少于 4 个字节或无法解析,这将失败,但我们也可以使用

进行映射
getMaybeWord32 :: ByteString -> Maybe Word32
getMaybeWord32 bs = case  G.runGetOrFail G.getWord32be $ BL.fromStrict bs of
  Left r -> Nothing
  Right (_, off, w32) -> Just w32

接下来的程序将打印有效 4 字节序列的解析

main :: IO ()
main = do
   filename:_ <- getArgs
   IO.withFile filename IO.ReadMode $ \h -> do
     runEffect $ chunksOfStrict 4 (Bytes.fromHandle h) 
                 >-> P.map getMaybeWord32
                 >-> P.concat  -- here `concat` eliminates maybes
                 >-> P.print 

当然,还有其他处理失败解析的方法。

不过,这里有一些更接近您要求的程序。它从字节流 (Producer ByteString m r) 中取出一个四字节的段,如果足够长,则将其读取为 Word32;然后它需要 那么多 的传入字节并将它们累积到一个惰性字节串中,产生它。它只是重复这个直到它用完字节。在下面的 main 中,我打印了每个生成的惰性字节串:

module Main (main) where 
import Pipes 
import qualified Pipes.Prelude as P
import Pipes.Group (folds) 
import qualified Pipes.ByteString as Bytes ( splitAt, fromHandle, chunksOf )
import Control.Lens ( view ) -- or Lens.Simple (view) -- or Lens.Micro ((.^))
import qualified System.IO as IO ( IOMode(ReadMode), withFile )
import qualified Data.Binary.Get as G ( runGet, getWord32be )
import Data.ByteString ( ByteString )
import qualified Data.ByteString.Lazy.Char8 as BL 
import System.Environment ( getArgs )

splitLazy :: (Monad m, Integral n) =>
   n -> Producer ByteString m r -> m (BL.ByteString, Producer ByteString m r)
splitLazy n bs = do
  (bss, rest) <- P.toListM' $ view (Bytes.splitAt n) bs
  return (BL.fromChunks bss, rest)

measureChunks :: Monad m => Producer ByteString m r -> Producer BL.ByteString m r
measureChunks bs = do
 (lbs, rest) <- lift $ splitLazy 4 bs
 if BL.length lbs /= 4
   then rest >-> P.drain -- in fact it will be empty
   else do
     let w32 = G.runGet G.getWord32be lbs
     (lbs', rest') <- lift $ splitLazy w32 bs
     yield lbs
     measureChunks rest

main :: IO ()
main = do
  filename:_ <- getArgs
  IO.withFile filename IO.ReadMode $ \h -> do
     runEffect $ measureChunks (Bytes.fromHandle h) >-> P.print

这又很粗糙,因为它使用 runGet 而不是 runGetOrFail,但这很容易修复。管道标准程序是在失败的解析和 return 未解析的字节流上停止流转换。

如果你预期 Word32s 是大数,所以你不想将相应的字节流累积为惰性字节串,而是说将它们写入不同的文件而不累积,我们可以很容易地改变程序来做到这一点。这将需要复杂地使用管道,但这是 pipesstreaming.

的首选方法

这里有一个我想投入使用的相对简单的解决方案。它重复使用 splitAt 包装到 State monad 中,提供与 Data.Binary.Get(的子集)相同的接口。结果 [ByteString] 是在 main 中获得的,在 getBlob 上有一个 whileJust

module Main (main) where

import           Control.Monad.Loops
import           Control.Monad.State
import qualified Data.Binary.Get      as G (getWord32be, runGet)
import qualified Data.ByteString.Lazy as BL
import           Data.Int             (Int64)
import           Data.Word            (Word32)
import           System.Environment   (getArgs)

-- this is going to mimic the Data.Binary.Get.Get Monad
type Get = State BL.ByteString

getWord32be :: Get (Maybe Word32)
getWord32be = state $ \bs -> do
    let (w, rest) = BL.splitAt 4 bs
    case BL.length w of
        4 -> (Just w', rest) where
            w' = G.runGet G.getWord32be w
        _ -> (Nothing, BL.empty)

getLazyByteString :: Int64 -> Get BL.ByteString
getLazyByteString n = state $ \bs -> BL.splitAt n bs

getBlob :: Get (Maybe BL.ByteString)
getBlob = do
    ml <- getWord32be
    case ml of
        Nothing -> return Nothing
        Just l -> do
            blob <- getLazyByteString (fromIntegral l :: Int64)
            return $ Just blob

runGet :: Get a -> BL.ByteString -> a
runGet g bs = fst $ runState g bs

main :: IO ()
main = do
    fname <- head <$> getArgs
    bs <- BL.readFile fname
    let ls = runGet loop bs where
        loop = whileJust getBlob return
    print $ length ls

getBlob 中没有错误处理,但很容易扩展。时间和 space 复杂度相当好,只要小心使用结果列表。 (创建一些随机数据供上面使用的 python 脚本是 here)。