网络上的高效二进制 I/O
Efficient binary I/O over a network
我正在尝试编写一个使用二进制网络协议的 Haskell 小程序,但我遇到了很多困难。
看来二进制数据应该存储为ByteString
.
问题:我应该只 hGet
/ hPut
单个多字节整数,还是构建一个大的 ByteString
并使用它会更高效?
看来 binary
包在这里应该有用。但是,binary
仅处理 lazy ByteString
值。
问题: lazy ByteString
上的 hGet
是否真的严格读取了指定的字节数?或者它是否尝试做一些懒惰的事情 I/O? (我不想想偷懒I/O!)
问题:为什么文档没有指定这个?
代码看起来会包含很多 "get the next integer, compare it to this value, if no then throw an error, otherwise continue to the next step..." 我不确定如何在不编写意大利面条代码的情况下清晰地构建它。
总而言之,我想做的事情很简单,但我似乎正在努力寻找一种使代码 看起来 简单的方法。也许我只是想多了,错过了一些明显的东西......
回复问题 1...
如果句柄配置为NoBuffering
,每次hPutStr
调用将生成一个写系统调用。对于大量的小写操作,这将导致巨大的性能损失。例如,请参阅此 SO 答案以了解一些基准测试:
另一方面,如果句柄启用了缓冲,您将需要显式刷新句柄以确保发送缓冲数据。
我假设您使用的是像 TCP 这样的流协议。使用 UDP,您显然必须将每条消息作为一个原子单元来形成和发送。
关于问题 #2...
阅读代码,似乎 hGet
对于惰性字节串将从句柄中读取 defaultChunkSize
的块,大约 32k。
更新:在这种情况下,hGet 似乎不 执行惰性 IO。这是一些代码来测试这个。
供稿:
#!/usr/bin/env perl
$| = 1;
my $c = 0;
my $k = "1" x 1024;
while (1) {
syswrite(STDOUT, $k);
$c++;
print STDERR "wrote 1k count = $c\n";
}
Test.hs:
import qualified Data.ByteString.Lazy as LBS
import System.IO
main = do
s <- LBS.hGet stdin 320000
let s2 = LBS.take 10 s
print $ ("Length s2 = ", s2)
运行 perl feed | runhaskell Test.hs
很明显,Haskell 程序要求 perl 程序提供全部 320k,即使它只使用前 10 个字节。
TCP 要求应用程序提供自己的消息边界标记。标记消息边界的一个简单协议是发送数据块的长度、数据块以及是否有剩余块是同一消息的一部分。保存消息边界信息的 header 的最佳大小取决于消息大小的分布。
开发我们自己的小消息协议,我们将为我们的 header 使用两个字节。字节中的最高有效位(被视为 Word16
)将保存消息中是否还有剩余块。剩余的 15 位将保存消息的字节长度。这将允许最大 32k 的块大小,比典型的 TCP 数据包大。如果消息通常非常小,尤其是小于 127 字节时,两个字节 header 将是 less-than-optimal。
我们将使用 network-simple for the networking portion of our code. We'll serialize or deserialize messages with the binary 包,其中 encode
s 和 decode
s 往返于惰性 ByteString
s。
import qualified Data.ByteString.Lazy as L
import qualified Data.ByteString as B
import Network.Simple.TCP
import Data.Bits
import Data.Binary
import Data.Functor
import Control.Monad.IO.Class
我们需要的第一个实用程序是能够将 Word16
header 写入严格的 ByteString
并再次读回它们。我们将按 big-endian 顺序编写它们。或者,这些可以根据 Word16
.
的 Binary
实例来编写
writeBE :: Word16 -> B.ByteString
writeBE x = B.pack . map fromIntegral $ [(x .&. 0xFF00) `shiftR` 8, x .&. 0xFF]
readBE :: B.ByteString -> Maybe Word16
readBE s =
case map fromIntegral . B.unpack $ s of
[w1, w0] -> Just $ w1 `shiftL` 8 .|. w0
_ -> Nothing
主要的挑战将是发送和接收二进制包强加给我们的惰性 ByteString
s。由于我们一次最多只能发送 32k 字节,因此我们需要能够 rechunk
将惰性字节串分成总已知长度不超过我们的最大值的块。单个块可能已经超过最大值;任何不适合我们新块的块都被分成多个块。
rechunk :: Int -> [B.ByteString] -> [(Int, [B.ByteString])]
rechunk n = go [] 0 . filter (not . B.null)
where
go acc l [] = [(l, reverse acc)]
go acc l (x:xs) =
let
lx = B.length x
l' = lx + l
in
if l' <= n
then go (x:acc) l' xs
else
let (x0, x1) = B.splitAt (n-l) x
in (n, reverse (x0:acc)) : go [] 0 (x1:xs)
recvExactly
将循环直到收到我们请求的所有字节。
recvExactly :: MonadIO m => Socket -> Int -> m (Maybe [B.ByteString])
recvExactly s toRead = go [] toRead
where
go acc toRead = do
body <- recv s toRead
maybe (return Nothing) (go' acc toRead) body
go' acc toRead body =
if B.length body < toRead
then go (body:acc) (toRead - B.length body)
else return . Just . reverse $ acc
发送惰性 ByteString
包括将其分成我们知道可以发送的大小的块,并发送每个块以及保存大小的 header 以及是否还有更多块。
sendLazyBS :: (MonadIO m) => Socket -> L.ByteString -> m ()
sendLazyBS s = go . rechunk maxChunk . L.toChunks
where
maxChunk = 0x7FFF
go [] = return ()
go ((li, ss):xs) = do
let l = fromIntegral li
let h = writeBE $ if null xs then l else l .|. 0x8000
sendMany s (h:ss)
go xs
接收延迟 ByteString
包括读取两个字节 header,读取 header 指示大小的块,并继续读取 header 表示还有更多块。
recvLazyBS :: (MonadIO m, Functor m) => Socket -> m (Maybe L.ByteString)
recvLazyBS s = fmap L.fromChunks <$> go []
where
go acc = do
header <- recvExactly s 2
maybe (return Nothing) (go' acc) (header >>= readBE . B.concat)
go' acc h = do
body <- recvExactly s . fromIntegral $ h .&. 0x7FFF
let next = if h .&. 0x8000 /= 0
then go
else return . Just . concat . reverse
maybe (return Nothing) (next . (:acc) ) body
发送或接收具有 Binary
实例的消息只是发送 encode
d 惰性 ByteString
或接收惰性 ByteString
和 decode
正在阅读它。
sendBinary :: (MonadIO m, Binary a) => Socket -> a -> m ()
sendBinary s = sendLazyBS s . encode
recvBinary :: (MonadIO m, Binary a, Functor m) => Socket -> m (Maybe a)
recvBinary s = d . fmap decodeOrFail <$> recvLazyBS s
where
d (Just (Right (_, _, x))) = Just x
d _ = Nothing
我正在尝试编写一个使用二进制网络协议的 Haskell 小程序,但我遇到了很多困难。
看来二进制数据应该存储为ByteString
.
问题:我应该只 hGet
/ hPut
单个多字节整数,还是构建一个大的 ByteString
并使用它会更高效?
看来 binary
包在这里应该有用。但是,binary
仅处理 lazy ByteString
值。
问题: lazy ByteString
上的 hGet
是否真的严格读取了指定的字节数?或者它是否尝试做一些懒惰的事情 I/O? (我不想想偷懒I/O!)
问题:为什么文档没有指定这个?
代码看起来会包含很多 "get the next integer, compare it to this value, if no then throw an error, otherwise continue to the next step..." 我不确定如何在不编写意大利面条代码的情况下清晰地构建它。
总而言之,我想做的事情很简单,但我似乎正在努力寻找一种使代码 看起来 简单的方法。也许我只是想多了,错过了一些明显的东西......
回复问题 1...
如果句柄配置为NoBuffering
,每次hPutStr
调用将生成一个写系统调用。对于大量的小写操作,这将导致巨大的性能损失。例如,请参阅此 SO 答案以了解一些基准测试:
另一方面,如果句柄启用了缓冲,您将需要显式刷新句柄以确保发送缓冲数据。
我假设您使用的是像 TCP 这样的流协议。使用 UDP,您显然必须将每条消息作为一个原子单元来形成和发送。
关于问题 #2...
阅读代码,似乎 hGet
对于惰性字节串将从句柄中读取 defaultChunkSize
的块,大约 32k。
更新:在这种情况下,hGet 似乎不 执行惰性 IO。这是一些代码来测试这个。 供稿:
#!/usr/bin/env perl
$| = 1;
my $c = 0;
my $k = "1" x 1024;
while (1) {
syswrite(STDOUT, $k);
$c++;
print STDERR "wrote 1k count = $c\n";
}
Test.hs:
import qualified Data.ByteString.Lazy as LBS
import System.IO
main = do
s <- LBS.hGet stdin 320000
let s2 = LBS.take 10 s
print $ ("Length s2 = ", s2)
运行 perl feed | runhaskell Test.hs
很明显,Haskell 程序要求 perl 程序提供全部 320k,即使它只使用前 10 个字节。
TCP 要求应用程序提供自己的消息边界标记。标记消息边界的一个简单协议是发送数据块的长度、数据块以及是否有剩余块是同一消息的一部分。保存消息边界信息的 header 的最佳大小取决于消息大小的分布。
开发我们自己的小消息协议,我们将为我们的 header 使用两个字节。字节中的最高有效位(被视为 Word16
)将保存消息中是否还有剩余块。剩余的 15 位将保存消息的字节长度。这将允许最大 32k 的块大小,比典型的 TCP 数据包大。如果消息通常非常小,尤其是小于 127 字节时,两个字节 header 将是 less-than-optimal。
我们将使用 network-simple for the networking portion of our code. We'll serialize or deserialize messages with the binary 包,其中 encode
s 和 decode
s 往返于惰性 ByteString
s。
import qualified Data.ByteString.Lazy as L
import qualified Data.ByteString as B
import Network.Simple.TCP
import Data.Bits
import Data.Binary
import Data.Functor
import Control.Monad.IO.Class
我们需要的第一个实用程序是能够将 Word16
header 写入严格的 ByteString
并再次读回它们。我们将按 big-endian 顺序编写它们。或者,这些可以根据 Word16
.
Binary
实例来编写
writeBE :: Word16 -> B.ByteString
writeBE x = B.pack . map fromIntegral $ [(x .&. 0xFF00) `shiftR` 8, x .&. 0xFF]
readBE :: B.ByteString -> Maybe Word16
readBE s =
case map fromIntegral . B.unpack $ s of
[w1, w0] -> Just $ w1 `shiftL` 8 .|. w0
_ -> Nothing
主要的挑战将是发送和接收二进制包强加给我们的惰性 ByteString
s。由于我们一次最多只能发送 32k 字节,因此我们需要能够 rechunk
将惰性字节串分成总已知长度不超过我们的最大值的块。单个块可能已经超过最大值;任何不适合我们新块的块都被分成多个块。
rechunk :: Int -> [B.ByteString] -> [(Int, [B.ByteString])]
rechunk n = go [] 0 . filter (not . B.null)
where
go acc l [] = [(l, reverse acc)]
go acc l (x:xs) =
let
lx = B.length x
l' = lx + l
in
if l' <= n
then go (x:acc) l' xs
else
let (x0, x1) = B.splitAt (n-l) x
in (n, reverse (x0:acc)) : go [] 0 (x1:xs)
recvExactly
将循环直到收到我们请求的所有字节。
recvExactly :: MonadIO m => Socket -> Int -> m (Maybe [B.ByteString])
recvExactly s toRead = go [] toRead
where
go acc toRead = do
body <- recv s toRead
maybe (return Nothing) (go' acc toRead) body
go' acc toRead body =
if B.length body < toRead
then go (body:acc) (toRead - B.length body)
else return . Just . reverse $ acc
发送惰性 ByteString
包括将其分成我们知道可以发送的大小的块,并发送每个块以及保存大小的 header 以及是否还有更多块。
sendLazyBS :: (MonadIO m) => Socket -> L.ByteString -> m ()
sendLazyBS s = go . rechunk maxChunk . L.toChunks
where
maxChunk = 0x7FFF
go [] = return ()
go ((li, ss):xs) = do
let l = fromIntegral li
let h = writeBE $ if null xs then l else l .|. 0x8000
sendMany s (h:ss)
go xs
接收延迟 ByteString
包括读取两个字节 header,读取 header 指示大小的块,并继续读取 header 表示还有更多块。
recvLazyBS :: (MonadIO m, Functor m) => Socket -> m (Maybe L.ByteString)
recvLazyBS s = fmap L.fromChunks <$> go []
where
go acc = do
header <- recvExactly s 2
maybe (return Nothing) (go' acc) (header >>= readBE . B.concat)
go' acc h = do
body <- recvExactly s . fromIntegral $ h .&. 0x7FFF
let next = if h .&. 0x8000 /= 0
then go
else return . Just . concat . reverse
maybe (return Nothing) (next . (:acc) ) body
发送或接收具有 Binary
实例的消息只是发送 encode
d 惰性 ByteString
或接收惰性 ByteString
和 decode
正在阅读它。
sendBinary :: (MonadIO m, Binary a) => Socket -> a -> m ()
sendBinary s = sendLazyBS s . encode
recvBinary :: (MonadIO m, Binary a, Functor m) => Socket -> m (Maybe a)
recvBinary s = d . fmap decodeOrFail <$> recvLazyBS s
where
d (Just (Right (_, _, x))) = Just x
d _ = Nothing