有没有办法让管道从多个来源获取数据而不阻塞其中任何一个?
Is there a way to have a conduit take data from multiple sources without blocking any of them?
我正在写一个服务器,其中一个要求是它需要能够向客户端推送数据,而无需客户端直接请求数据。我正在使用管道,但感觉这超出了管道的能力。我 运行 遇到的问题是,似乎没有办法判断套接字是否有可用数据,等待将阻止执行,直到有可用数据。假设我有以下功能
getPacket :: Conduit ByteString IO ClientPacket --take a bytestring and yield a ClientPacket i.e. the ByteString deserialized into a sensible form
processPacket :: Conduit ClientPacket IO ServerPacket --take a ClientPacket and yield a ServerPacket i.e. a response to the client's request
putPacket :: Conduit ServerPacket IO ByteString --serialize the ServerPacket
然后我将管道与 Conduit.Network 库中的源和接收器连接在一起[=13=]
appSource appData $$ getPacket =$= processPacket =$= putPacket $= appSink appData
现在,我从管道外部引入一个数据源,我想将该数据合并到管道中。例如,如果这是一个聊天服务器,则外部数据将是其他客户端发送的消息。问题是无论我尝试在何处引入这些外部数据,它都会被等待调用阻塞。本质上,我最终会得到如下所示的代码。
yield processOutsideData --deal with the outside data
data <- await --await data from upstream
处理更多外部数据的唯一方法是上游组件产生某些东西,但上游只有在从客户端获取数据时才会产生,这正是我试图避免的。我试过使用多线程和 TChan 来解决这个问题,但似乎 appSource 和 appSink 必须在同一个线程中使用,否则我会从 recv 得到无效的文件描述符异常(这是有道理的)。
但是,如果套接字源和接收器 运行 在同一个线程中,我再次 运行 进入等待阻塞的问题,我无法检查数据是否可用从插座。在这一点上,我好像碰壁了。
但我真的很喜欢使用导管,并且更愿意继续使用它们。所以我的问题是:有没有办法通过管道实现我想要实现的目标?
Michael Snoyman 的 conduit network examples use concurrency。 telnet 客户端示例 运行 一个线程用于发送输入,另一个线程用于显示接收到的内容。我已经调整它来发送和接收整行
{-# LANGUAGE OverloadedStrings #-}
import Conduit
import Control.Concurrent.Async (concurrently)
import Control.Monad (liftM, void)
import Data.ByteString (ByteString)
import Data.ByteString.Char8 (unpack)
import Data.Conduit.Network
import Data.String (IsString, fromString)
import Network (withSocketsDo)
getLines :: (IsString a, MonadIO m) => Producer m a
getLines = repeatMC . liftM fromString $ liftIO getLine
putLines :: (MonadIO m) => Consumer ByteString m ()
putLines = mapM_C $ liftIO . putStrLn . unpack
main :: IO ()
main = withSocketsDo $
runTCPClient (clientSettings 4000 "localhost") $ \server ->
void $ concurrently
(getLines $$ appSink server)
(appSource server $$ putLines)
我们可以在服务器上做同样的事情。创建一个STM通道,将接收到的数据写入通道,并将通道中的数据发送给客户端。这使用 stm-conduit 包的简单包装器围绕 STM 通道,sourceTBMChan
和 sinkTBMChan
.
{-# LANGUAGE OverloadedStrings #-}
import Conduit
import Control.Concurrent.Async (concurrently)
import Control.Concurrent.STM.TBMChan (newTBMChan)
import Control.Monad (void)
import Control.Monad.STM (atomically)
import Data.Conduit.Network
import Data.Conduit.TMChan (sourceTBMChan, sinkTBMChan)
import Network (withSocketsDo)
main :: IO ()
main = withSocketsDo $ do
channel <- atomically $ newTBMChan 10
runTCPServer (serverSettings 4000 "*") $ \server ->
void $ concurrently
(appSource server $$ sinkTBMChan channel False)
(sourceTBMChan channel $$ appSink server)
如果我们 运行 服务器只连接了一个客户端,它会回显客户端发送的内容。
----------
| a | (sent)
| a | (received)
| b | (sent)
| b | (received)
| c | (sent)
| c | (received)
----------
如果我们 运行 连接了多个客户端的服务器,消息将在客户端之间分发,每个客户端获取一条消息。
---------- ----------
| 1 | (sent) | 1 | (received)
| 2 | (sent) | 3 | (received)
| 2 | (received) | |
| 3 | (sent) | |
| | | |
| | | |
---------- ----------
此示例不处理客户端关闭连接时要执行的操作。
我正在写一个服务器,其中一个要求是它需要能够向客户端推送数据,而无需客户端直接请求数据。我正在使用管道,但感觉这超出了管道的能力。我 运行 遇到的问题是,似乎没有办法判断套接字是否有可用数据,等待将阻止执行,直到有可用数据。假设我有以下功能
getPacket :: Conduit ByteString IO ClientPacket --take a bytestring and yield a ClientPacket i.e. the ByteString deserialized into a sensible form
processPacket :: Conduit ClientPacket IO ServerPacket --take a ClientPacket and yield a ServerPacket i.e. a response to the client's request
putPacket :: Conduit ServerPacket IO ByteString --serialize the ServerPacket
然后我将管道与 Conduit.Network 库中的源和接收器连接在一起[=13=]
appSource appData $$ getPacket =$= processPacket =$= putPacket $= appSink appData
现在,我从管道外部引入一个数据源,我想将该数据合并到管道中。例如,如果这是一个聊天服务器,则外部数据将是其他客户端发送的消息。问题是无论我尝试在何处引入这些外部数据,它都会被等待调用阻塞。本质上,我最终会得到如下所示的代码。
yield processOutsideData --deal with the outside data
data <- await --await data from upstream
处理更多外部数据的唯一方法是上游组件产生某些东西,但上游只有在从客户端获取数据时才会产生,这正是我试图避免的。我试过使用多线程和 TChan 来解决这个问题,但似乎 appSource 和 appSink 必须在同一个线程中使用,否则我会从 recv 得到无效的文件描述符异常(这是有道理的)。
但是,如果套接字源和接收器 运行 在同一个线程中,我再次 运行 进入等待阻塞的问题,我无法检查数据是否可用从插座。在这一点上,我好像碰壁了。
但我真的很喜欢使用导管,并且更愿意继续使用它们。所以我的问题是:有没有办法通过管道实现我想要实现的目标?
Michael Snoyman 的 conduit network examples use concurrency。 telnet 客户端示例 运行 一个线程用于发送输入,另一个线程用于显示接收到的内容。我已经调整它来发送和接收整行
{-# LANGUAGE OverloadedStrings #-}
import Conduit
import Control.Concurrent.Async (concurrently)
import Control.Monad (liftM, void)
import Data.ByteString (ByteString)
import Data.ByteString.Char8 (unpack)
import Data.Conduit.Network
import Data.String (IsString, fromString)
import Network (withSocketsDo)
getLines :: (IsString a, MonadIO m) => Producer m a
getLines = repeatMC . liftM fromString $ liftIO getLine
putLines :: (MonadIO m) => Consumer ByteString m ()
putLines = mapM_C $ liftIO . putStrLn . unpack
main :: IO ()
main = withSocketsDo $
runTCPClient (clientSettings 4000 "localhost") $ \server ->
void $ concurrently
(getLines $$ appSink server)
(appSource server $$ putLines)
我们可以在服务器上做同样的事情。创建一个STM通道,将接收到的数据写入通道,并将通道中的数据发送给客户端。这使用 stm-conduit 包的简单包装器围绕 STM 通道,sourceTBMChan
和 sinkTBMChan
.
{-# LANGUAGE OverloadedStrings #-}
import Conduit
import Control.Concurrent.Async (concurrently)
import Control.Concurrent.STM.TBMChan (newTBMChan)
import Control.Monad (void)
import Control.Monad.STM (atomically)
import Data.Conduit.Network
import Data.Conduit.TMChan (sourceTBMChan, sinkTBMChan)
import Network (withSocketsDo)
main :: IO ()
main = withSocketsDo $ do
channel <- atomically $ newTBMChan 10
runTCPServer (serverSettings 4000 "*") $ \server ->
void $ concurrently
(appSource server $$ sinkTBMChan channel False)
(sourceTBMChan channel $$ appSink server)
如果我们 运行 服务器只连接了一个客户端,它会回显客户端发送的内容。
----------
| a | (sent)
| a | (received)
| b | (sent)
| b | (received)
| c | (sent)
| c | (received)
----------
如果我们 运行 连接了多个客户端的服务器,消息将在客户端之间分发,每个客户端获取一条消息。
---------- ----------
| 1 | (sent) | 1 | (received)
| 2 | (sent) | 3 | (received)
| 2 | (received) | |
| 3 | (sent) | |
| | | |
| | | |
---------- ----------
此示例不处理客户端关闭连接时要执行的操作。