使用 Conduit 分块数据

Chunk data with Conduit

这是一个 conduit 组合器的示例,当从上游接收到完整消息时,它应该 yield 下游:

import qualified Data.ByteString as BS
import Data.Conduit
import Data.Conduit.Combinators
import Data.Conduit.Network

message :: Monad m => ConduitT BS.ByteString BS.ByteString m ()
message = loop
  where
    loop = await >>= maybe (return ()) go
    go x = if (BS.isSuffixOf "|" x)
        then yield (BS.init x) >> loop
        else leftover x

服务器代码本身如下所示:

main :: IO ()
main = do
  runTCPServer (serverSettings 5000 "!4") $ \ appData -> runConduit $
    (appSource appData)
    .| message
    .| (appSink appData)

出于某种原因 telnet 127.0.0.1 5000 发送任何消息后断开连接:

telnet 127.0.0.1 5000
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
123|
Connection closed by foreign host.

请指教,我这里做错了什么?

更新

更重要的是,我在这里尝试做的是等待完成信号 |,然后 yield 下游的完整消息。这是 message 组合子的演变:

message :: Monad m => ConduitT BS.ByteString BS.ByteString m ()
message = do
  minput <- await
  case minput of
    Nothing    -> return ()
    Just input -> do
      case BS.breakSubstring "|" input of
        ("", "")  -> return ()
        ("", "|") -> return ()
        ("", xs)  -> leftover $ BS.tail xs
        (x, "")   -> leftover x -- problem is in this leftover
        (x, xs)   -> do
          yield x
          leftover $ BS.tail xs
      message

我的想法是,如果上游组合器没有任何消息,则必须等到有消息时,它才能向下游发送完整的消息。但是在上面的 message 组合器中, conduit 开始在 CPU 上旋转很多 leftover 调用。

go 中打印 x 进行调试。

  ...
  go x = do
    liftIO (Prelude.print x)
    if ...

套接字接收到一个以\r\n结尾的字节串,所以你转到else分支,终止会话。

终于发现在基本情况下有必要 await 而不是 leftover。以下是 message 组合器的工作方式:

message :: Monad m => ConduitT BS.ByteString BS.ByteString m ()
message = do
  minput <- await
  case minput of
    Nothing    -> return ()
    Just input -> process input >> message
  where
    process input =
      case BS.breakSubstring "|" input of
        ("", "")  -> return ()
        ("", "|") -> return ()
        ("", xs)  -> leftover $ BS.tail xs
        (x, "")   -> do
          minput <- await
          case minput of
            Nothing -> return ()
            Just newInput -> process $ BS.concat [x, newInput]
        (x, xs)   -> do
          yield x
          leftover $ BS.tail xs

A bit of boilerplate that can probably be cleaned up, but it works.