Haskell 中的高效比特流
Efficient bitstreams in Haskell
在持续努力以有效地 fiddle 位(例如,请参阅此 ),最新的挑战是有效的流和位的消耗。
作为第一个简单任务,我选择在 /dev/urandom
生成的比特流中找到最长的相同比特序列。一个典型的咒语是 head -c 1000000 </dev/urandom | my-exe
。实际目标是流式传输比特并解码 Elias gamma code,例如,即不是字节块或其倍数的代码。
对于这种可变长度的代码,最好有 take
、takeWhile
、group
等列表操作语言。由于 BitStream.take
实际上会消耗部分 bistream,一些 monad 可能会发挥作用。
明显的起点是来自 Data.ByteString.Lazy
的惰性字节串。
一个。计算字节数
这个非常简单的 Haskell 程序的性能与 C 程序相当,正如预期的那样。
import qualified Data.ByteString.Lazy as BSL
main :: IO ()
main = do
bs <- BSL.getContents
print $ BSL.length bs
乙。添加字节
一旦我开始使用 unpack
事情就会变得更糟。
main = do
bs <- BSL.getContents
print $ sum $ BSL.unpack bs
令人惊讶的是,Haskell 和 C 表现出几乎相同的性能。
C。相同位的最长序列
作为第一个重要任务,最长的相同位序列可以这样找到:
module Main where
import Data.Bits (shiftR, (.&.))
import qualified Data.ByteString.Lazy as BSL
import Data.List (group)
import Data.Word8 (Word8)
splitByte :: Word8 -> [Bool]
splitByte w = Prelude.map (\i-> (w `shiftR` i) .&. 1 == 1) [0..7]
bitStream :: BSL.ByteString -> [Bool]
bitStream bs = concat $ map splitByte (BSL.unpack bs)
main :: IO ()
main = do
bs <- BSL.getContents
print $ maximum $ length <$> (group $ bitStream bs)
惰性字节串被转换为列表 [Word8]
,然后使用移位将每个 Word
拆分为位,得到列表 [Bool]
。这个列表列表然后用 concat
展平。获得 Bool
的(惰性)列表后,使用 group
将列表拆分为相同位的序列,然后将 length
映射到它上面。最后 maximum
给出了想要的结果。很简单,但不是很快:
# C
real 0m0.606s
# Haskell
real 0m6.062s
这个简单的实现正好慢了一个数量级。
分析显示分配了相当多的内存(解析 1MB 的输入大约需要 3GB)。不过,没有观察到大规模 space 泄漏。
我从这里开始四处寻找:
- 详情见
bitstream
package that promises "Fast, packed, strict bit streams (i.e. list of Bools) with semi-automatic stream fusion.". Unfortunately it is not up-to-date with the current vector
package, see here。
- 接下来,我调查
streaming
。我不太明白为什么我需要 'effectful' 流式传输来发挥一些 monad - 至少在我从提出的任务的相反方向开始之前,即编码比特流并将其写入文件。
fold
-ing 越过 ByteString
怎么样?我必须引入状态来跟踪消耗的位。这不是很好的 take
、takeWhile
、group
等语言。
现在我不太确定该去哪里。
更新:
我想出了如何用 streaming
and streaming-bytestring
做到这一点。我可能做得不对,因为结果非常糟糕。
import Data.Bits (shiftR, (.&.))
import qualified Data.ByteString.Streaming as BSS
import Data.Word8 (Word8)
import qualified Streaming as S
import Streaming.Prelude (Of, Stream)
import qualified Streaming.Prelude as S
splitByte :: Word8 -> [Bool]
splitByte w = (\i-> (w `shiftR` i) .&. 1 == 1) <$> [0..7]
bitStream :: Monad m => Stream (Of Word8) m () -> Stream (Of Bool) m ()
bitStream s = S.concat $ S.map splitByte s
main :: IO ()
main = do
let bs = BSS.unpack BSS.getContents :: Stream (Of Word8) IO ()
gs = S.group $ bitStream bs :: Stream (Stream (Of Bool) IO) IO ()
maxLen <- S.maximum $ S.mapped S.length gs
print $ S.fst' maxLen
这将考验您对超过几千字节的标准输入输入的耐心。探查器说它在 Streaming.Internal.>>=.loop
和 Data.Functor.Of.fmap
中花费了大量时间(输入大小的二次方)。我不太确定第一个是什么,但是 fmap
表明(?)这些 Of a b
的杂耍对我们没有任何好处,因为我们在 IO monad 中无法优化。
我还有字节加法器 here: SumBytesStream.hs
, which is slightly slower than the simple lazy ByteString
implementation, but still decent. Since streaming-bytestring
is proclaimed 的流等效项是“bytestring io done right”,我期望更好。那我可能做的不对。
无论如何,所有这些位计算都不应该发生在 IO monad 中。但是 BSS.getContents
迫使我进入 IO monad 因为 getContents :: MonadIO m => ByteString m ()
并且没有出路。
更新 2
根据@dfeuer 的建议,我在 master@HEAD 中使用了 streaming
包。这是结果。
longest-seq-c 0m0.747s (C)
longest-seq 0m8.190s (Haskell ByteString)
longest-seq-stream 0m13.946s (Haskell streaming-bytestring)
Streaming.concat
的 O(n^2) 问题已经解决,但我们仍然没有接近 C 基准。
更新 3
Cirdec 的解决方案产生与 C 相当的性能。使用的构造称为 "Church encoded lists",请参阅此 SO answer or the Haskell Wiki on rank-N types。
源文件:
所有源文件都可以在github上找到。 Makefile
具有 运行 实验和分析的所有不同目标。默认的 make
将只构建所有内容(首先创建一个 bin/
目录!)然后 make time
将对 longest-seq
可执行文件进行计时。 C 可执行文件附加 -c
以区分它们。
当对流的操作融合在一起时,可以删除中间分配及其相应的开销。 GHC prelude 以 rewrite rules 的形式为惰性流提供 foldr/build 融合。一般的想法是,如果一个函数产生一个看起来像 foldr 的结果(它具有应用于 (:)
和 []
的类型 (a -> b -> b) -> b -> b
),而另一个函数使用一个看起来像 foldr 的列表, 构造中间列表可以去掉
对于您的问题,我将构建类似的东西,但使用严格的左折叠 (foldl'
) 而不是 foldr。我不会使用重写规则来检测什么时候看起来像 foldl
,而是使用强制列表看起来像左折叠的数据类型。
-- A list encoded as a strict left fold.
newtype ListS a = ListS {build :: forall b. (b -> a -> b) -> b -> b}
由于我已经开始放弃列表,所以我们将重新实现列表前奏的一部分。
可以从列表和字节串的 foldl'
函数中创建严格的左折叠。
{-# INLINE fromList #-}
fromList :: [a] -> ListS a
fromList l = ListS (\c z -> foldl' c z l)
{-# INLINE fromBS #-}
fromBS :: BSL.ByteString -> ListS Word8
fromBS l = ListS (\c z -> BSL.foldl' c z l)
最简单的使用示例是查找列表的长度。
{-# INLINE length' #-}
length' :: ListS a -> Int
length' l = build l (\z a -> z+1) 0
我们还可以映射和连接左折叠。
{-# INLINE map' #-}
-- fmap renamed so it can be inlined
map' f l = ListS (\c z -> build l (\z a -> c z (f a)) z)
{-# INLINE concat' #-}
concat' :: ListS (ListS a) -> ListS a
concat' ll = ListS (\c z -> build ll (\z l -> build l c z) z)
对于您的问题,我们需要能够将单词拆分成位。
{-# INLINE splitByte #-}
splitByte :: Word8 -> [Bool]
splitByte w = Prelude.map (\i-> (w `shiftR` i) .&. 1 == 1) [0..7]
{-# INLINE splitByte' #-}
splitByte' :: Word8 -> ListS Bool
splitByte' = fromList . splitByte
和一个ByteString
成位
{-# INLINE bitStream' #-}
bitStream' :: BSL.ByteString -> ListS Bool
bitStream' = concat' . map' splitByte' . fromBS
为了找到最长的 运行,我们将跟踪先前的值、当前的长度 运行 和最长的 运行 的长度。我们使字段严格,以便折叠的严格性防止 thunk 链在内存中累积。为状态制定严格的数据类型是控制其内存表示和评估其字段的时间的简单方法。
data LongestRun = LongestRun !Bool !Int !Int
{-# INLINE extendRun #-}
extendRun (LongestRun previous run longest) x = LongestRun x current (max current longest)
where
current = if x == previous then run + 1 else 1
{-# INLINE longestRun #-}
longestRun :: ListS Bool -> Int
longestRun l = longest
where
(LongestRun _ _ longest) = build l extendRun (LongestRun False 0 0)
我们完成了
main :: IO ()
main = do
bs <- BSL.getContents
print $ longestRun $ bitStream' bs
这要快得多,但性能不如 c。
longest-seq-c 0m00.12s (C)
longest-seq 0m08.65s (Haskell ByteString)
longest-seq-fuse 0m00.81s (Haskell ByteString fused)
程序分配大约 1 Mb 以从输入中读取 1000000 个字节。
total alloc = 1,173,104 bytes (excludes profiling overheads)
已更新github code
我找到了另一个与 C 相当的解决方案 Data.Vector.Fusion.Stream.Monadic
has a stream implementation based on this Coutts, Leshchinskiy, Stewart 2007 paper。它背后的想法是使用 destroy/unfoldr 流融合。
回想一下可以方便地进行模式匹配的列表 unfoldr :: (b -> Maybe (a, b)) -> b -> [a]
creates a list by repeatedly applying (unfolding) a step-forward function, starting with an initial value. A Stream
is just an unfoldr
function with starting state. (The Data.Vector.Fusion.Stream.Monadic
library uses GADTs to create constructors for Step
。我认为没有 GADT 也可以完成。)
解决方案的核心部分是 mkBitstream :: BSL.ByteString -> Stream Bool
函数,它将 BytesString
转换为 Bool
的流。基本上,我们跟踪当前 ByteString
、当前字节,以及当前字节还有多少未被使用。每当一个字节用完时,另一个字节就会被砍掉 ByteString
。剩下Nothing
时,流为Done
.
longestRun
函数直接取自@Cirdec 的解决方案。
这是练习曲:
{-# LANGUAGE CPP #-}
#define PHASE_FUSED [1]
#define PHASE_INNER [0]
#define INLINE_FUSED INLINE PHASE_FUSED
#define INLINE_INNER INLINE PHASE_INNER
module Main where
import Control.Monad.Identity (Identity)
import Data.Bits (shiftR, (.&.))
import qualified Data.ByteString.Lazy as BSL
import Data.Functor.Identity (runIdentity)
import qualified Data.Vector.Fusion.Stream.Monadic as S
import Data.Word8 (Word8)
type Stream a = S.Stream Identity a -- no need for any monad, really
data Step = Step BSL.ByteString !Word8 !Word8 -- could use tuples, but this is faster
mkBitstream :: BSL.ByteString -> Stream Bool
mkBitstream bs' = S.Stream step (Step bs' 0 0) where
{-# INLINE_INNER step #-}
step (Step bs w n) | n==0 = case (BSL.uncons bs) of
Nothing -> return S.Done
Just (w', bs') -> return $
S.Yield (w' .&. 1 == 1) (Step bs' (w' `shiftR` 1) 7)
| otherwise = return $
S.Yield (w .&. 1 == 1) (Step bs (w `shiftR` 1) (n-1))
data LongestRun = LongestRun !Bool !Int !Int
{-# INLINE extendRun #-}
extendRun :: LongestRun -> Bool -> LongestRun
extendRun (LongestRun previous run longest) x = LongestRun x current (max current longest)
where current = if x == previous then run + 1 else 1
{-# INLINE longestRun #-}
longestRun :: Stream Bool -> Int
longestRun s = runIdentity $ do
(LongestRun _ _ longest) <- S.foldl' extendRun (LongestRun False 0 0) s
return longest
main :: IO ()
main = do
bs <- BSL.getContents
print $ longestRun (mkBitstream bs)
在持续努力以有效地 fiddle 位(例如,请参阅此
作为第一个简单任务,我选择在 /dev/urandom
生成的比特流中找到最长的相同比特序列。一个典型的咒语是 head -c 1000000 </dev/urandom | my-exe
。实际目标是流式传输比特并解码 Elias gamma code,例如,即不是字节块或其倍数的代码。
对于这种可变长度的代码,最好有 take
、takeWhile
、group
等列表操作语言。由于 BitStream.take
实际上会消耗部分 bistream,一些 monad 可能会发挥作用。
明显的起点是来自 Data.ByteString.Lazy
的惰性字节串。
一个。计算字节数
这个非常简单的 Haskell 程序的性能与 C 程序相当,正如预期的那样。
import qualified Data.ByteString.Lazy as BSL
main :: IO ()
main = do
bs <- BSL.getContents
print $ BSL.length bs
乙。添加字节
一旦我开始使用 unpack
事情就会变得更糟。
main = do
bs <- BSL.getContents
print $ sum $ BSL.unpack bs
令人惊讶的是,Haskell 和 C 表现出几乎相同的性能。
C。相同位的最长序列
作为第一个重要任务,最长的相同位序列可以这样找到:
module Main where
import Data.Bits (shiftR, (.&.))
import qualified Data.ByteString.Lazy as BSL
import Data.List (group)
import Data.Word8 (Word8)
splitByte :: Word8 -> [Bool]
splitByte w = Prelude.map (\i-> (w `shiftR` i) .&. 1 == 1) [0..7]
bitStream :: BSL.ByteString -> [Bool]
bitStream bs = concat $ map splitByte (BSL.unpack bs)
main :: IO ()
main = do
bs <- BSL.getContents
print $ maximum $ length <$> (group $ bitStream bs)
惰性字节串被转换为列表 [Word8]
,然后使用移位将每个 Word
拆分为位,得到列表 [Bool]
。这个列表列表然后用 concat
展平。获得 Bool
的(惰性)列表后,使用 group
将列表拆分为相同位的序列,然后将 length
映射到它上面。最后 maximum
给出了想要的结果。很简单,但不是很快:
# C
real 0m0.606s
# Haskell
real 0m6.062s
这个简单的实现正好慢了一个数量级。
分析显示分配了相当多的内存(解析 1MB 的输入大约需要 3GB)。不过,没有观察到大规模 space 泄漏。
我从这里开始四处寻找:
- 详情见
bitstream
package that promises "Fast, packed, strict bit streams (i.e. list of Bools) with semi-automatic stream fusion.". Unfortunately it is not up-to-date with the currentvector
package, see here。 - 接下来,我调查
streaming
。我不太明白为什么我需要 'effectful' 流式传输来发挥一些 monad - 至少在我从提出的任务的相反方向开始之前,即编码比特流并将其写入文件。 fold
-ing 越过ByteString
怎么样?我必须引入状态来跟踪消耗的位。这不是很好的take
、takeWhile
、group
等语言。
现在我不太确定该去哪里。
更新:
我想出了如何用 streaming
and streaming-bytestring
做到这一点。我可能做得不对,因为结果非常糟糕。
import Data.Bits (shiftR, (.&.))
import qualified Data.ByteString.Streaming as BSS
import Data.Word8 (Word8)
import qualified Streaming as S
import Streaming.Prelude (Of, Stream)
import qualified Streaming.Prelude as S
splitByte :: Word8 -> [Bool]
splitByte w = (\i-> (w `shiftR` i) .&. 1 == 1) <$> [0..7]
bitStream :: Monad m => Stream (Of Word8) m () -> Stream (Of Bool) m ()
bitStream s = S.concat $ S.map splitByte s
main :: IO ()
main = do
let bs = BSS.unpack BSS.getContents :: Stream (Of Word8) IO ()
gs = S.group $ bitStream bs :: Stream (Stream (Of Bool) IO) IO ()
maxLen <- S.maximum $ S.mapped S.length gs
print $ S.fst' maxLen
这将考验您对超过几千字节的标准输入输入的耐心。探查器说它在 Streaming.Internal.>>=.loop
和 Data.Functor.Of.fmap
中花费了大量时间(输入大小的二次方)。我不太确定第一个是什么,但是 fmap
表明(?)这些 Of a b
的杂耍对我们没有任何好处,因为我们在 IO monad 中无法优化。
我还有字节加法器 here: SumBytesStream.hs
, which is slightly slower than the simple lazy ByteString
implementation, but still decent. Since streaming-bytestring
is proclaimed 的流等效项是“bytestring io done right”,我期望更好。那我可能做的不对。
无论如何,所有这些位计算都不应该发生在 IO monad 中。但是 BSS.getContents
迫使我进入 IO monad 因为 getContents :: MonadIO m => ByteString m ()
并且没有出路。
更新 2
根据@dfeuer 的建议,我在 master@HEAD 中使用了 streaming
包。这是结果。
longest-seq-c 0m0.747s (C)
longest-seq 0m8.190s (Haskell ByteString)
longest-seq-stream 0m13.946s (Haskell streaming-bytestring)
Streaming.concat
的 O(n^2) 问题已经解决,但我们仍然没有接近 C 基准。
更新 3
Cirdec 的解决方案产生与 C 相当的性能。使用的构造称为 "Church encoded lists",请参阅此 SO answer or the Haskell Wiki on rank-N types。
源文件:
所有源文件都可以在github上找到。 Makefile
具有 运行 实验和分析的所有不同目标。默认的 make
将只构建所有内容(首先创建一个 bin/
目录!)然后 make time
将对 longest-seq
可执行文件进行计时。 C 可执行文件附加 -c
以区分它们。
当对流的操作融合在一起时,可以删除中间分配及其相应的开销。 GHC prelude 以 rewrite rules 的形式为惰性流提供 foldr/build 融合。一般的想法是,如果一个函数产生一个看起来像 foldr 的结果(它具有应用于 (:)
和 []
的类型 (a -> b -> b) -> b -> b
),而另一个函数使用一个看起来像 foldr 的列表, 构造中间列表可以去掉
对于您的问题,我将构建类似的东西,但使用严格的左折叠 (foldl'
) 而不是 foldr。我不会使用重写规则来检测什么时候看起来像 foldl
,而是使用强制列表看起来像左折叠的数据类型。
-- A list encoded as a strict left fold.
newtype ListS a = ListS {build :: forall b. (b -> a -> b) -> b -> b}
由于我已经开始放弃列表,所以我们将重新实现列表前奏的一部分。
可以从列表和字节串的 foldl'
函数中创建严格的左折叠。
{-# INLINE fromList #-}
fromList :: [a] -> ListS a
fromList l = ListS (\c z -> foldl' c z l)
{-# INLINE fromBS #-}
fromBS :: BSL.ByteString -> ListS Word8
fromBS l = ListS (\c z -> BSL.foldl' c z l)
最简单的使用示例是查找列表的长度。
{-# INLINE length' #-}
length' :: ListS a -> Int
length' l = build l (\z a -> z+1) 0
我们还可以映射和连接左折叠。
{-# INLINE map' #-}
-- fmap renamed so it can be inlined
map' f l = ListS (\c z -> build l (\z a -> c z (f a)) z)
{-# INLINE concat' #-}
concat' :: ListS (ListS a) -> ListS a
concat' ll = ListS (\c z -> build ll (\z l -> build l c z) z)
对于您的问题,我们需要能够将单词拆分成位。
{-# INLINE splitByte #-}
splitByte :: Word8 -> [Bool]
splitByte w = Prelude.map (\i-> (w `shiftR` i) .&. 1 == 1) [0..7]
{-# INLINE splitByte' #-}
splitByte' :: Word8 -> ListS Bool
splitByte' = fromList . splitByte
和一个ByteString
成位
{-# INLINE bitStream' #-}
bitStream' :: BSL.ByteString -> ListS Bool
bitStream' = concat' . map' splitByte' . fromBS
为了找到最长的 运行,我们将跟踪先前的值、当前的长度 运行 和最长的 运行 的长度。我们使字段严格,以便折叠的严格性防止 thunk 链在内存中累积。为状态制定严格的数据类型是控制其内存表示和评估其字段的时间的简单方法。
data LongestRun = LongestRun !Bool !Int !Int
{-# INLINE extendRun #-}
extendRun (LongestRun previous run longest) x = LongestRun x current (max current longest)
where
current = if x == previous then run + 1 else 1
{-# INLINE longestRun #-}
longestRun :: ListS Bool -> Int
longestRun l = longest
where
(LongestRun _ _ longest) = build l extendRun (LongestRun False 0 0)
我们完成了
main :: IO ()
main = do
bs <- BSL.getContents
print $ longestRun $ bitStream' bs
这要快得多,但性能不如 c。
longest-seq-c 0m00.12s (C)
longest-seq 0m08.65s (Haskell ByteString)
longest-seq-fuse 0m00.81s (Haskell ByteString fused)
程序分配大约 1 Mb 以从输入中读取 1000000 个字节。
total alloc = 1,173,104 bytes (excludes profiling overheads)
已更新github code
我找到了另一个与 C 相当的解决方案 Data.Vector.Fusion.Stream.Monadic
has a stream implementation based on this Coutts, Leshchinskiy, Stewart 2007 paper。它背后的想法是使用 destroy/unfoldr 流融合。
回想一下可以方便地进行模式匹配的列表 unfoldr :: (b -> Maybe (a, b)) -> b -> [a]
creates a list by repeatedly applying (unfolding) a step-forward function, starting with an initial value. A Stream
is just an unfoldr
function with starting state. (The Data.Vector.Fusion.Stream.Monadic
library uses GADTs to create constructors for Step
。我认为没有 GADT 也可以完成。)
解决方案的核心部分是 mkBitstream :: BSL.ByteString -> Stream Bool
函数,它将 BytesString
转换为 Bool
的流。基本上,我们跟踪当前 ByteString
、当前字节,以及当前字节还有多少未被使用。每当一个字节用完时,另一个字节就会被砍掉 ByteString
。剩下Nothing
时,流为Done
.
longestRun
函数直接取自@Cirdec 的解决方案。
这是练习曲:
{-# LANGUAGE CPP #-}
#define PHASE_FUSED [1]
#define PHASE_INNER [0]
#define INLINE_FUSED INLINE PHASE_FUSED
#define INLINE_INNER INLINE PHASE_INNER
module Main where
import Control.Monad.Identity (Identity)
import Data.Bits (shiftR, (.&.))
import qualified Data.ByteString.Lazy as BSL
import Data.Functor.Identity (runIdentity)
import qualified Data.Vector.Fusion.Stream.Monadic as S
import Data.Word8 (Word8)
type Stream a = S.Stream Identity a -- no need for any monad, really
data Step = Step BSL.ByteString !Word8 !Word8 -- could use tuples, but this is faster
mkBitstream :: BSL.ByteString -> Stream Bool
mkBitstream bs' = S.Stream step (Step bs' 0 0) where
{-# INLINE_INNER step #-}
step (Step bs w n) | n==0 = case (BSL.uncons bs) of
Nothing -> return S.Done
Just (w', bs') -> return $
S.Yield (w' .&. 1 == 1) (Step bs' (w' `shiftR` 1) 7)
| otherwise = return $
S.Yield (w .&. 1 == 1) (Step bs (w `shiftR` 1) (n-1))
data LongestRun = LongestRun !Bool !Int !Int
{-# INLINE extendRun #-}
extendRun :: LongestRun -> Bool -> LongestRun
extendRun (LongestRun previous run longest) x = LongestRun x current (max current longest)
where current = if x == previous then run + 1 else 1
{-# INLINE longestRun #-}
longestRun :: Stream Bool -> Int
longestRun s = runIdentity $ do
(LongestRun _ _ longest) <- S.foldl' extendRun (LongestRun False 0 0) s
return longest
main :: IO ()
main = do
bs <- BSL.getContents
print $ longestRun (mkBitstream bs)