Haskell 中的高效比特流

Efficient bitstreams in Haskell

在持续努力以有效地 fiddle 位(例如,请参阅此 ),最新的挑战是有效的流和位的消耗。

作为第一个简单任务,我选择在 /dev/urandom 生成的比特流中找到最长的相同比特序列。一个典型的咒语是 head -c 1000000 </dev/urandom | my-exe。实际目标是流式传输比特并解码 Elias gamma code,例如,即不是字节块或其倍数的代码。

对于这种可变长度的代码,最好有 taketakeWhilegroup 等列表操作语言。由于 BitStream.take 实际上会消耗部分 bistream,一些 monad 可能会发挥作用。

明显的起点是来自 Data.ByteString.Lazy 的惰性字节串。

一个。计算字节数

这个非常简单的 Haskell 程序的性能与 C 程序相当,正如预期的那样。

import qualified Data.ByteString.Lazy as BSL

main :: IO ()
main = do
    bs <- BSL.getContents
    print $ BSL.length bs

乙。添加字节

一旦我开始使用 unpack 事情就会变得更糟。

main = do
    bs <- BSL.getContents
    print $ sum $ BSL.unpack bs

令人惊讶的是,Haskell 和 C 表现出几乎相同的性能。

C。相同位的最长序列

作为第一个重要任务,最长的相同位序列可以这样找到:

module Main where

import           Data.Bits            (shiftR, (.&.))
import qualified Data.ByteString.Lazy as BSL
import           Data.List            (group)
import           Data.Word8           (Word8)

splitByte :: Word8 -> [Bool]
splitByte w = Prelude.map (\i-> (w `shiftR` i) .&. 1 == 1) [0..7]

bitStream :: BSL.ByteString -> [Bool]
bitStream bs = concat $ map splitByte (BSL.unpack bs)

main :: IO ()
main = do
    bs <- BSL.getContents
    print $ maximum $ length <$> (group $ bitStream bs)

惰性字节串被转换为列表 [Word8],然后使用移位将每个 Word 拆分为位,得到列表 [Bool]。这个列表列表然后用 concat 展平。获得 Bool 的(惰性)列表后,使用 group 将列表拆分为相同位的序列,然后将 length 映射到它上面。最后 maximum 给出了想要的结果。很简单,但不是很快:

# C
real    0m0.606s

# Haskell
real    0m6.062s

这个简单的实现正好慢了一个数量级。

分析显示分配了相当多的内存(解析 1MB 的输入大约需要 3GB)。不过,没有观察到大规模 space 泄漏。

我从这里开始四处寻找:

现在我不太确定该去哪里。

更新:

我想出了如何用 streaming and streaming-bytestring 做到这一点。我可能做得不对,因为结果非常糟糕。

import           Data.Bits                 (shiftR, (.&.))
import qualified Data.ByteString.Streaming as BSS
import           Data.Word8                (Word8)
import qualified Streaming                 as S
import           Streaming.Prelude         (Of, Stream)
import qualified Streaming.Prelude         as S

splitByte :: Word8 -> [Bool]
splitByte w = (\i-> (w `shiftR` i) .&. 1 == 1) <$> [0..7]

bitStream :: Monad m => Stream (Of Word8) m () -> Stream (Of Bool) m ()
bitStream s = S.concat $ S.map splitByte s

main :: IO ()
main = do
    let bs = BSS.unpack BSS.getContents :: Stream (Of Word8) IO ()
        gs = S.group $ bitStream bs ::  Stream (Stream (Of Bool) IO) IO ()
    maxLen <- S.maximum $ S.mapped S.length gs
    print $ S.fst' maxLen

这将考验您对超过几千字节的标准输入输入的耐心。探查器说它在 Streaming.Internal.>>=.loopData.Functor.Of.fmap 中花费了大量时间(输入大小的二次方)。我不太确定第一个是什么,但是 fmap 表明(?)这些 Of a b 的杂耍对我们没有任何好处,因为我们在 IO monad 中无法优化。

我还有字节加法器 here: SumBytesStream.hs, which is slightly slower than the simple lazy ByteString implementation, but still decent. Since streaming-bytestring is proclaimed 的流等效项是“bytestring io done right”,我期望更好。那我可能做的不对。

无论如何,所有这些位计算都不应该发生在 IO monad 中。但是 BSS.getContents 迫使我进入 IO monad 因为 getContents :: MonadIO m => ByteString m () 并且没有出路。

更新 2

根据@dfeuer 的建议,我在 master@HEAD 中使用了 streaming 包。这是结果。

longest-seq-c       0m0.747s    (C)
longest-seq         0m8.190s    (Haskell ByteString)
longest-seq-stream  0m13.946s   (Haskell streaming-bytestring)

Streaming.concat 的 O(n^2) 问题已经解决,但我们仍然没有接近 C 基准。

更新 3

Cirdec 的解决方案产生与 C 相当的性能。使用的构造称为 "Church encoded lists",请参阅此 SO answer or the Haskell Wiki on rank-N types

源文件:

所有源文件都可以在github上找到。 Makefile 具有 运行 实验和分析的所有不同目标。默认的 make 将只构建所有内容(首先创建一个 bin/ 目录!)然后 make time 将对 longest-seq 可执行文件进行计时。 C 可执行文件附加 -c 以区分它们。

当对流的操作融合在一起时,可以删除中间分配及其相应的开销。 GHC prelude 以 rewrite rules 的形式为惰性流提供 foldr/build 融合。一般的想法是,如果一个函数产生一个看起来像 foldr 的结果(它具有应用于 (:)[] 的类型 (a -> b -> b) -> b -> b),而另一个函数使用一个看起来像 foldr 的列表, 构造中间列表可以去掉

对于您的问题,我将构建类似的东西,但使用严格的左折叠 (foldl') 而不是 foldr。我不会使用重写规则来检测什么时候看起来像 foldl,而是使用强制列表看起来像左折叠的数据类型。

-- A list encoded as a strict left fold.
newtype ListS a = ListS {build :: forall b. (b -> a -> b) -> b -> b}

由于我已经开始放弃列表,所以我们将重新实现列表前奏的一部分。

可以从列表和字节串的 foldl' 函数中创建严格的左折叠。

{-# INLINE fromList #-}
fromList :: [a] -> ListS a
fromList l = ListS (\c z -> foldl' c z l)

{-# INLINE fromBS #-}
fromBS :: BSL.ByteString -> ListS Word8
fromBS l = ListS (\c z -> BSL.foldl' c z l)

最简单的使用示例是查找列表的长度。

{-# INLINE length' #-}
length' :: ListS a -> Int
length' l = build l (\z a -> z+1) 0

我们还可以映射和连接左折叠。

{-# INLINE map' #-}
-- fmap renamed so it can be inlined
map' f l = ListS (\c z -> build l (\z a -> c z (f a)) z)

{-# INLINE concat' #-}
concat' :: ListS (ListS a) -> ListS a
concat' ll = ListS (\c z -> build ll (\z l -> build l c z) z)

对于您的问题,我们需要能够将单词拆分成位。

{-# INLINE splitByte #-}
splitByte :: Word8 -> [Bool]
splitByte w = Prelude.map (\i-> (w `shiftR` i) .&. 1 == 1) [0..7]

{-# INLINE splitByte' #-}
splitByte' :: Word8 -> ListS Bool
splitByte' = fromList . splitByte

和一个ByteString成位

{-# INLINE bitStream' #-}
bitStream' :: BSL.ByteString -> ListS Bool
bitStream' = concat' . map' splitByte' . fromBS

为了找到最长的 运行,我们将跟踪先前的值、当前的长度 运行 和最长的 运行 的长度。我们使字段严格,以便折叠的严格性防止 thunk 链在内存中累积。为状态制定严格的数据类型是控制其内存表示和评估其字段的时间的简单方法。

data LongestRun = LongestRun !Bool !Int !Int

{-# INLINE extendRun #-}
extendRun (LongestRun previous run longest) x = LongestRun x current (max current longest)
  where
    current = if x == previous then run + 1 else 1

{-# INLINE longestRun #-}
longestRun :: ListS Bool -> Int
longestRun l = longest
 where
   (LongestRun _ _ longest) = build l extendRun (LongestRun False 0 0)

我们完成了

main :: IO ()
main = do
    bs <- BSL.getContents
    print $ longestRun $ bitStream' bs

这要快得多,但性能不如 c。

longest-seq-c       0m00.12s    (C)
longest-seq         0m08.65s    (Haskell ByteString)
longest-seq-fuse    0m00.81s    (Haskell ByteString fused)

程序分配大约 1 Mb 以从输入中读取 1000000 个字节。

total alloc =   1,173,104 bytes  (excludes profiling overheads)

已更新github code

我找到了另一个与 C 相当的解决方案 Data.Vector.Fusion.Stream.Monadic has a stream implementation based on this Coutts, Leshchinskiy, Stewart 2007 paper。它背后的想法是使用 destroy/unfoldr 流融合。

回想一下可以方便地进行模式匹配的列表 unfoldr :: (b -> Maybe (a, b)) -> b -> [a] creates a list by repeatedly applying (unfolding) a step-forward function, starting with an initial value. A Stream is just an unfoldr function with starting state. (The Data.Vector.Fusion.Stream.Monadic library uses GADTs to create constructors for Step。我认为没有 GADT 也可以完成。)

解决方案的核心部分是 mkBitstream :: BSL.ByteString -> Stream Bool 函数,它将 BytesString 转换为 Bool 的流。基本上,我们跟踪当前 ByteString、当前字节,以及当前字节还有多少未被使用。每当一个字节用完时,另一个字节就会被砍掉 ByteString。剩下Nothing时,流为Done.

longestRun 函数直接取自@Cirdec 的解决方案。

这是练习曲:

{-# LANGUAGE CPP #-}
#define PHASE_FUSED [1]
#define PHASE_INNER [0]
#define INLINE_FUSED INLINE PHASE_FUSED
#define INLINE_INNER INLINE PHASE_INNER
module Main where

import           Control.Monad.Identity            (Identity)
import           Data.Bits                         (shiftR, (.&.))
import qualified Data.ByteString.Lazy              as BSL
import           Data.Functor.Identity             (runIdentity)
import qualified Data.Vector.Fusion.Stream.Monadic as S
import           Data.Word8                        (Word8)

type Stream a = S.Stream Identity a   -- no need for any monad, really

data Step = Step BSL.ByteString !Word8 !Word8   -- could use tuples, but this is faster

mkBitstream :: BSL.ByteString -> Stream Bool
mkBitstream bs' = S.Stream step (Step bs' 0 0) where
    {-# INLINE_INNER step #-}
    step (Step bs w n) | n==0 = case (BSL.uncons bs) of
                            Nothing        -> return S.Done
                            Just (w', bs') -> return $ 
                                S.Yield (w' .&. 1 == 1) (Step bs' (w' `shiftR` 1) 7)
                       | otherwise = return $ 
                                S.Yield (w .&. 1 == 1) (Step bs (w `shiftR` 1) (n-1))


data LongestRun = LongestRun !Bool !Int !Int

{-# INLINE extendRun #-}
extendRun :: LongestRun -> Bool -> LongestRun
extendRun (LongestRun previous run longest) x  = LongestRun x current (max current longest)
    where current = if x == previous then run + 1 else 1

{-# INLINE longestRun #-}
longestRun :: Stream Bool -> Int
longestRun s = runIdentity $ do
    (LongestRun _ _ longest) <- S.foldl' extendRun (LongestRun False 0 0) s
    return longest

main :: IO ()
main = do
    bs <- BSL.getContents
    print $ longestRun (mkBitstream bs)