Haskell 中处理惰性输入通道的惯用方式是什么

What's an idiomatic way of handling a lazy input channel in Haskell

我正在实现一个 IRC 机器人,因为我使用 OpenSSL.Session 通过 SSL 连接,所以我使用 lazyRead 函数从套接字读取数据。在连接的初始阶段,我需要按顺序执行几件事:nick 协商、nickserv 识别、加入频道等),因此涉及一些状态。现在我想出了以下内容:

data ConnectionState = Initial | NickIdentification | Connected

listen :: SSL.SSL -> IO ()
listen ssl = do
  lines <- BL.lines `fmap` SSL.lazyRead ssl
  evalStateT (mapM_ (processLine ssl) lines) Initial

processLine :: SSL.SSL -> BL.ByteString -> StateT ConnectionState IO ()
processLine ssl line = do case message of
                            Just a -> processMessage ssl a
                            Nothing -> return ()
  where message = IRC.decode $ BL.toStrict line

processMessage :: SSL.SSL -> IRC.Message -> StateT ConnectionState IO ()
processMessage ssl m = do
    state <- S.get
    case state of
      Initial -> when (IRC.msg_command m == "376") $ do
        liftIO $ putStrLn "connected!"
        liftIO $ privmsg ssl "NickServ" ("identify " ++ nick_password)
        S.put NickIdentification
      NickIdentification -> do
        when (identified m) $ do
          liftIO $ putStrLn "identified!"
          liftIO $ joinChannel ssl chan
          S.put Connected
      Connected -> return ()
    liftIO $ print m
    when (IRC.msg_command m == "PING") $ (liftIO . pong . mconcat . map show) (IRC.msg_params m)

因此,当我进入 "Connected" 状态时,我仍然会通过 case 语句结束,即使它只是真正需要初始化连接。另一个问题是添加嵌套的 StateT 会非常痛苦。

其他方法是用一些自定义的东西替换 mapM 以仅处理线路直到我们连接,然后在其余部分开始另一个循环。这将需要跟踪列表中剩余的内容或再次调用 SSL.lazyRead(这还不错)。

另一种解决方案是将剩余的线列表保留在状态中,并在需要时绘制线,类似于 getLine

在这种情况下最好做什么? Haskell 的懒惰会导致我们在状态停止更新后直接进入 Connected 的情况还是 case 总是严格的?

您可以使用 pipes 中的 Pipe 类型。诀窍在于,您可以在 Pipe.


这是 Pipe 的样子:

stateful :: Pipe ByteString ByteString IO r
stateful = do
    msg <- await
    if (IRC.msg_command msg == "376")
        then do
            liftIO $ putStrLn "connected!"
            liftIO $ privmsg ssl "NickServ" ("identify " ++ nick_password)
            yield msg
        else stateful

nick :: Pipe ByteString ByteString IO r
nick = do
    msg <- await
    if identified msg
        then do
            liftIO $ putStrLn "identified!"
            liftIO $ joinChannel ssl chan
            yield msg
            cat  -- Forward the remaining input to output indefinitely 
        else nick

stateful 管道对应于 processMessage 函数的有状态部分。它处理初始化和身份验证,但通过重新 yielding msg.


然后您可以使用 for:

遍历这 Pipe yield 秒的每条消息
processMessage :: Consumer ByteString IO r
processMessage = for stateful $ \msg -> do
    liftIO $ print m
    when (IRC.msg_command m == "PING") $ (liftIO . pong . mconcat . map show) (IRC.msg_params m)

现在您所需要的只是 ByteString 行的来源以提供给 processMessage。您可以使用以下 Producer:

lines :: Producer ByteString IO ()
lines = do
    bs <- liftIO (ByteString.getLine)
    if ByteString.null bs
        then return ()
        else do
            yield bs

然后您可以将 lines 连接到 processMessage 和 运行 他们:

runEffect (lines >-> processMessage) :: IO ()

请注意,lines 生产者不使用惰性 IO。即使你使用严格的 ByteString 模块,它也会工作,但整个程序的行为仍然是惰性的。

如果您想详细了解 pipes 的工作原理,可以阅读 the pipes tutorial