无法通过 stdin 将数据传递给使用 conduit-extra 生成的进程
Can't pass data via stdin to process spawned with conduit-extra
在我的程序中,我正在启动外部进程并通过 stdin
和 stdout
与其通信。我从 STM
s TQueue
开始通过管道(生产者)提供输入。在我决定升级 lts 版本之前,它一直很有效。它适用于 lts <= 8.24。
这是重现我的问题的最小化程序:
#!/usr/bin/env stack
-- stack --resolver lts-10.4 --install-ghc runghc --package conduit-extra --package stm-conduit
{-# LANGUAGE OverloadedStrings #-}
import Control.Concurrent
import Control.Monad.STM
import Control.Concurrent.STM.TQueue
import Data.Conduit
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.List as CL
import Data.Conduit.Process (CreateProcess (..),
proc, sourceProcessWithStreams)
import qualified Data.Conduit.TQueue as CTQ
import qualified Data.ByteString.Char8 as BS
import Data.Monoid ((<>))
main :: IO ()
main = do
putStrLn "Enter \"exit\" to exit."
q <- open
putStrLn "connection opened"
loop q
where loop q = do
s <- BS.getLine
case s of
"exit" -> return ()
req -> do
atomically $ writeTQueue q req
loop q
open :: IO (TQueue BS.ByteString)
open = do
req <- atomically newTQueue
let chat :: CreateProcess
chat = proc "cat" []
input :: Producer IO BS.ByteString
input = toProducer
$ CTQ.sourceTQueue req
-- .| CL.mapM_ (\bs -> BS.putStrLn (("queue: " :: BS.ByteString) <> bs))
output :: Consumer BS.ByteString IO ()
output = toConsumer
$ CL.mapM_ BS.putStrLn
_ <- forkIO (sourceProcessWithStreams chat input output output >> pure ())
pure req
对于较新的 lts,问题似乎是 而不是 通过 TQueue
进行通信,因为取消注释从输入管道打印内容的行显示来自队列的数据.看起来生成的进程从未在 stdin
.
上收到任何东西
进一步从控制台写入生成的猫 stdin
,如下所示:
echo "test" > /proc/<pid of spawned cat>/fd/0
在我的程序中产生输出。
我是否遗漏了版本之间的变化?
所以问题是 sinkHandle
的默认行为是 changed 在每个数据块之后不刷新。
我已经解决了这个问题,首先移植到 Data.Conduit.Process.Typed
,然后滚动我自己的 createSink
变体,它使用 sinkHandleFlush 而不是 sinkHandle
。
在我的程序中,我正在启动外部进程并通过 stdin
和 stdout
与其通信。我从 STM
s TQueue
开始通过管道(生产者)提供输入。在我决定升级 lts 版本之前,它一直很有效。它适用于 lts <= 8.24。
这是重现我的问题的最小化程序:
#!/usr/bin/env stack
-- stack --resolver lts-10.4 --install-ghc runghc --package conduit-extra --package stm-conduit
{-# LANGUAGE OverloadedStrings #-}
import Control.Concurrent
import Control.Monad.STM
import Control.Concurrent.STM.TQueue
import Data.Conduit
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.List as CL
import Data.Conduit.Process (CreateProcess (..),
proc, sourceProcessWithStreams)
import qualified Data.Conduit.TQueue as CTQ
import qualified Data.ByteString.Char8 as BS
import Data.Monoid ((<>))
main :: IO ()
main = do
putStrLn "Enter \"exit\" to exit."
q <- open
putStrLn "connection opened"
loop q
where loop q = do
s <- BS.getLine
case s of
"exit" -> return ()
req -> do
atomically $ writeTQueue q req
loop q
open :: IO (TQueue BS.ByteString)
open = do
req <- atomically newTQueue
let chat :: CreateProcess
chat = proc "cat" []
input :: Producer IO BS.ByteString
input = toProducer
$ CTQ.sourceTQueue req
-- .| CL.mapM_ (\bs -> BS.putStrLn (("queue: " :: BS.ByteString) <> bs))
output :: Consumer BS.ByteString IO ()
output = toConsumer
$ CL.mapM_ BS.putStrLn
_ <- forkIO (sourceProcessWithStreams chat input output output >> pure ())
pure req
对于较新的 lts,问题似乎是 而不是 通过 TQueue
进行通信,因为取消注释从输入管道打印内容的行显示来自队列的数据.看起来生成的进程从未在 stdin
.
进一步从控制台写入生成的猫 stdin
,如下所示:
echo "test" > /proc/<pid of spawned cat>/fd/0
在我的程序中产生输出。
我是否遗漏了版本之间的变化?
所以问题是 sinkHandle
的默认行为是 changed 在每个数据块之后不刷新。
我已经解决了这个问题,首先移植到 Data.Conduit.Process.Typed
,然后滚动我自己的 createSink
变体,它使用 sinkHandleFlush 而不是 sinkHandle
。