网络上的高效二进制 I/O

Efficient binary I/O over a network

我正在尝试编写一个使用二进制网络协议的 Haskell 小程序,但我遇到了很多困难。

看来二进制数据应该存储为ByteString.

问题:我应该只 hGet / hPut 单个多字节整数,还是构建一个大的 ByteString 并使用它会更高效?

看来 binary 包在这里应该有用。但是,binary 仅处理 lazy ByteString 值。

问题: lazy ByteString 上的 hGet 是否真的严格读取了指定的字节数?或者它是否尝试做一些懒惰的事情 I/O? (我不想想偷懒I/O!)

问题:为什么文档没有指定这个?

代码看起来会包含很多 "get the next integer, compare it to this value, if no then throw an error, otherwise continue to the next step..." 我不确定如何在不编写意大利面条代码的情况下清晰地构建它。

总而言之,我想做的事情很简单,但我似乎正在努力寻找一种使代码 看起来 简单的方法。也许我只是想多了,错过了一些明显的东西......

回复问题 1...

如果句柄配置为NoBuffering,每次hPutStr调用将生成一个写系统调用。对于大量的小写操作,这将导致巨大的性能损失。例如,请参阅此 SO 答案以了解一些基准测试:

另一方面,如果句柄启用了缓冲,您将需要显式刷新句柄以确保发送缓冲数据。

我假设您使用的是像 TCP 这样的流协议。使用 UDP,您显然必须将每条消息作为一个原子单元来形成和发送。

关于问题 #2...

阅读代码,似乎 hGet 对于惰性字节串将从句柄中读取 defaultChunkSize 的块,大约 32k。

更新:在这种情况下,hGet 似乎 执行惰性 IO。这是一些代码来测试这个。 供稿:

#!/usr/bin/env perl
$| = 1;
my $c = 0;
my $k = "1" x 1024;
while (1) {
  syswrite(STDOUT, $k);
  $c++;
  print STDERR "wrote 1k count = $c\n";
}

Test.hs:

import qualified Data.ByteString.Lazy as LBS
import System.IO

main = do
  s <- LBS.hGet stdin 320000
  let s2 = LBS.take 10 s
  print $ ("Length s2 = ", s2)

运行 perl feed | runhaskell Test.hs 很明显,Haskell 程序要求 perl 程序提供全部 320k,即使它只使用前 10 个字节。

TCP 要求应用程序提供自己的消息边界标记。标记消息边界的一个简单协议是发送数据块的长度、数据块以及是否有剩余块是同一消息的一部分。保存消息边界信息的 header 的最佳大小取决于消息大小的分布。

开发我们自己的小消息协议,我们将为我们的 header 使用两个字节。字节中的最高有效位(被视为 Word16)将保存消息中是否还有剩余块。剩余的 15 位将保存消息的字节长度。这将允许最大 32k 的块大小,比典型的 TCP 数据包大。如果消息通常非常小,尤其是小于 127 字节时,两个字节 header 将是 less-than-optimal。

我们将使用 network-simple for the networking portion of our code. We'll serialize or deserialize messages with the binary 包,其中 encodes 和 decodes 往返于惰性 ByteStrings。

import qualified Data.ByteString.Lazy as L
import qualified Data.ByteString as B

import Network.Simple.TCP 
import Data.Bits
import Data.Binary
import Data.Functor
import Control.Monad.IO.Class

我们需要的第一个实用程序是能够将 Word16 header 写入严格的 ByteString 并再次读回它们。我们将按 big-endian 顺序编写它们。或者,这些可以根据 Word16.

Binary 实例来编写
writeBE :: Word16 -> B.ByteString
writeBE x = B.pack . map fromIntegral $ [(x .&. 0xFF00) `shiftR` 8, x .&. 0xFF]

readBE :: B.ByteString -> Maybe Word16
readBE s =
    case map fromIntegral . B.unpack $ s of
        [w1, w0] -> Just $ w1 `shiftL` 8 .|. w0
        _        -> Nothing

主要的挑战将是发送和接收二进制包强加给我们的惰性 ByteStrings。由于我们一次最多只能发送 32k 字节,因此我们需要能够 rechunk 将惰性字节串分成总已知长度不超过我们的最大值的块。单个块可能已经超过最大值;任何不适合我们新块的块都被分成多个块。

rechunk :: Int -> [B.ByteString] -> [(Int, [B.ByteString])]
rechunk n = go [] 0 . filter (not . B.null)
    where
        go acc l []     = [(l, reverse acc)]
        go acc l (x:xs) =
            let
                lx = B.length x
                l' = lx + l
            in
                if l' <= n
                then go (x:acc) l' xs
                else
                    let (x0, x1) = B.splitAt (n-l) x
                    in (n, reverse (x0:acc)) : go [] 0 (x1:xs)

recvExactly 将循环直到收到我们请求的所有字节。

recvExactly :: MonadIO m => Socket -> Int -> m (Maybe [B.ByteString])
recvExactly s toRead = go [] toRead
    where
        go acc toRead = do
            body <- recv s toRead
            maybe (return Nothing) (go' acc toRead) body
        go' acc toRead body =
            if B.length body < toRead
            then go (body:acc) (toRead - B.length body)
            else return . Just . reverse $ acc

发送惰性 ByteString 包括将其分成我们知道可以发送的大小的块,并发送每个块以及保存大小的 header 以及是否还有更多块。

sendLazyBS :: (MonadIO m) => Socket -> L.ByteString -> m ()
sendLazyBS s = go . rechunk maxChunk . L.toChunks
    where
        maxChunk = 0x7FFF
        go [] = return ()
        go ((li, ss):xs) = do
            let l = fromIntegral li
            let h = writeBE $ if null xs then l else l .|. 0x8000
            sendMany s (h:ss)
            go xs

接收延迟 ByteString 包括读取两个字节 header,读取 header 指示大小的块,并继续读取 header 表示还有更多块。

recvLazyBS :: (MonadIO m, Functor m) => Socket -> m (Maybe L.ByteString)
recvLazyBS s = fmap L.fromChunks <$> go [] 
    where
        go acc = do
            header <- recvExactly s 2
            maybe (return Nothing) (go' acc) (header >>= readBE . B.concat)
        go' acc h = do
            body <- recvExactly s . fromIntegral $ h .&. 0x7FFF
            let next = if h .&. 0x8000 /= 0
                       then go
                       else return . Just . concat . reverse
            maybe (return Nothing) (next . (:acc) ) body     

发送或接收具有 Binary 实例的消息只是发送 encoded 惰性 ByteString 或接收惰性 ByteStringdecode正在阅读它。

sendBinary :: (MonadIO m, Binary a) => Socket -> a -> m ()
sendBinary s = sendLazyBS s . encode

recvBinary :: (MonadIO m, Binary a, Functor m) => Socket -> m (Maybe a)
recvBinary s = d . fmap decodeOrFail <$> recvLazyBS s
    where
        d (Just (Right (_, _, x))) = Just x
        d _                        = Nothing