有没有办法让管道从多个来源获取数据而不阻塞其中任何一个?

Is there a way to have a conduit take data from multiple sources without blocking any of them?

我正在写一个服务器,其中一个要求是它需要能够向客户端推送数据,而无需客户端直接请求数据。我正在使用管道,但感觉这超出了管道的能力。我 运行 遇到的问题是,似乎没有办法判断套接字是否有可用数据,等待将阻止执行,直到有可用数据。假设我有以下功能

getPacket :: Conduit ByteString IO ClientPacket --take a bytestring and yield a ClientPacket i.e. the ByteString deserialized into a sensible form
processPacket :: Conduit ClientPacket IO ServerPacket --take a ClientPacket and yield a ServerPacket i.e. a response to the client's request
putPacket :: Conduit ServerPacket IO ByteString --serialize the ServerPacket

然后我将管道与 Conduit.Network 库中的源和接收器连接在一起[=13​​=]

appSource appData $$ getPacket =$= processPacket =$= putPacket $= appSink appData

现在,我从管道外部引入一个数据源,我想将该数据合并到管道中。例如,如果这是一个聊天服务器,则外部数据将是其他客户端发送的消息。问题是无论我尝试在何处引入这些外部数据,它都会被等待调用阻塞。本质上,我最终会得到如下所示的代码。

yield processOutsideData --deal with the outside data
data <- await            --await data from upstream

处理更多外部数据的唯一方法是上游组件产生某些东西,但上游只有在从客户端获取数据时才会产生,这正是我试图避免的。我试过使用多线程和 TChan 来解决这个问题,但似乎 appSource 和 appSink 必须在同一个线程中使用,否则我会从 recv 得到无效的文件描述符异常(这是有道理的)。

但是,如果套接字源和接收器 运行 在同一个线程中,我再次 运行 进入等待阻塞的问题,我无法检查数据是否可用从插座。在这一点上,我好像碰壁了。

但我真的很喜欢使用导管,并且更愿意继续使用它们。所以我的问题是:有没有办法通过管道实现我想要实现的目标?

Michael Snoyman 的 conduit network examples use concurrency。 telnet 客户端示例 运行 一个线程用于发送输入,另一个线程用于显示接收到的内容。我已经调整它来发送和接收整行

{-# LANGUAGE OverloadedStrings #-}
import Conduit
import Control.Concurrent.Async (concurrently)
import Control.Monad            (liftM, void)
import Data.ByteString          (ByteString)
import Data.ByteString.Char8    (unpack)
import Data.Conduit.Network
import Data.String              (IsString, fromString)
import Network                  (withSocketsDo)

getLines :: (IsString a, MonadIO m) => Producer m a
getLines = repeatMC . liftM fromString $ liftIO getLine

putLines :: (MonadIO m) => Consumer ByteString m ()
putLines = mapM_C $ liftIO . putStrLn . unpack

main :: IO ()
main = withSocketsDo $
    runTCPClient (clientSettings 4000 "localhost") $ \server ->
        void $ concurrently
            (getLines $$ appSink server)
            (appSource server $$ putLines)

我们可以在服务器上做同样的事情。创建一个STM通道,将接收到的数据写入通道,并将通道中的数据发送给客户端。这使用 stm-conduit 包的简单包装器围绕 STM 通道,sourceTBMChansinkTBMChan.

{-# LANGUAGE OverloadedStrings #-}
import Conduit
import Control.Concurrent.Async       (concurrently)
import Control.Concurrent.STM.TBMChan (newTBMChan)
import Control.Monad                  (void)
import Control.Monad.STM              (atomically)
import Data.Conduit.Network
import Data.Conduit.TMChan            (sourceTBMChan, sinkTBMChan)
import Network                        (withSocketsDo)

main :: IO ()
main = withSocketsDo $ do
    channel <- atomically $ newTBMChan 10
    runTCPServer (serverSettings 4000 "*") $ \server ->
        void $ concurrently
            (appSource server $$ sinkTBMChan channel False)
            (sourceTBMChan channel $$ appSink server)

如果我们 运行 服务器只连接了一个客户端,它会回显客户端发送的内容。

----------
| a      | (sent)
| a      | (received)
| b      | (sent)
| b      | (received)
| c      | (sent)
| c      | (received) 
----------

如果我们 运行 连接了多个客户端的服务器,消息将在客户端之间分发,每个客户端获取一条消息。

----------             ----------
| 1      | (sent)      | 1      | (received)
| 2      | (sent)      | 3      | (received)
| 2      | (received)  |        |
| 3      | (sent)      |        |
|        |             |        |
|        |             |        |
----------             ----------

此示例不处理客户端关闭连接时要执行的操作。