我将如何使用每次传入重置的超时进行管道传输?
How would I pipe with a timeout that resets with each incoming?
如果没有收到任何内容,withTimeout
函数假设每 s :: Int
秒通过管道 ConsoleEvent
发送一个 CeTimeout
。相反,它无法在适当的时间发送 CeTimeout
事件。如果原始事件丢失超过 s
秒,一个 CeTimeout
事件将被其他事件替换。此外,它应该是 n*s
CeTimeout
个事件,而不是一个 CeTimeout
事件,每个 s
第二个周期都计算 n
。错误在哪里,更正的地方是什么?谢谢!
withTimeout :: (MonadIO t) => Int -> Pipe ConsoleEvent ConsoleEvent t ()
withTimeout ((* 1000000) -> s) = join . liftIO $ work
where
work :: (MonadIO t) => IO (Pipe ConsoleEvent ConsoleEvent t ())
work =
do
(oSent, iKept) <- spawn $ bounded 1
(oKept, iSent) <- spawn $ unbounded
(oTimeout, iTimeout) <- spawn $ bounded 1
tid <- launchTimeout oTimeout >>= newMVar
forkIO $ do
runEffect . forever $ fromInput iKept >-> factorTimeout tid oTimeout >-> toOutput oKept
forkIO $ do
runEffect . forever $ fromInput iTimeout >-> toOutput oKept
return $ do
await >>= (liftIO . guardedSend oSent)
(liftIO . guardedRecv $ iSent) >>= yield
guardedSend :: Output ConsoleEvent -> ConsoleEvent -> IO ()
guardedSend o ce =
(atomically $ send o ce) >>= \case
True -> return ()
otherwise -> die $ "withTimeout can not send"
guardedRecv :: Input ConsoleEvent -> IO ConsoleEvent
guardedRecv i =
(atomically $ recv i) >>= \case
Just a -> return a
otherwise -> die $ "withTimeout can not recv"
launchTimeout :: Output ConsoleEvent -> IO ThreadId
launchTimeout o =
forkIO . forever $ do
threadDelay $ s
(atomically $ send o CeTimeout) >>= \case
True -> return ()
otherwise -> die "withTimeout can not send timeout"
relaunchTimeout :: Output ConsoleEvent -> ThreadId -> IO ThreadId
relaunchTimeout o oldTid =
do
tid <- launchTimeout o
killThread oldTid
return tid
factorTimeout :: MVar ThreadId -> Output ConsoleEvent -> Pipe ConsoleEvent ConsoleEvent IO ()
factorTimeout v o =
do
ce <- await
liftIO . modifyMVar_ v $ relaunchTimeout o
yield ce
这是一个完整的可执行文件 script。
似乎 Pipe
每个 await
只允许一个 yield
。这意味着 CeTimeout
不能任意地沿着管道发送,因为没有任何东西进入管道导致流动。我将不得不通过源代码来确认这一点;同时,此函数已重构为 return、Pipe
和 Producer
而不仅仅是 Pipe
。 Producer
然后可以在调用函数中重新加入。最初的计划是 return 只是一个 Pipe
,这样调用函数就不必做任何额外的工作来使超时工作。那将是一个更独立的解决方案。这个替代方案很好,因为它更明确。对于不熟悉管道的人来说,超时看起来不会像凭空出现。
withTimeout :: (MonadIO t) => Int -> IO (Pipe ConsoleEvent ConsoleEvent t (), Producer ConsoleEvent t ())
withTimeout ((* 1000000) -> s) =
do
(oTimeout, iTimeout) <- spawn $ bounded 1
vTid <- launchTimeout oTimeout >>= newMVar
return (factorTimeout vTid oTimeout, fromInput iTimeout)
where
launchTimeout :: Output ConsoleEvent -> IO ThreadId
launchTimeout o =
forkIO . forever $ do
threadDelay $ s
(atomically $ send o CeTimeout) >>= \case
True -> return ()
otherwise -> die "withTimeout can not send timeout"
relaunchTimeout :: Output ConsoleEvent -> ThreadId -> IO ThreadId
relaunchTimeout o oldTid =
do
tid <- launchTimeout o
killThread oldTid
return tid
factorTimeout :: (MonadIO t) => MVar ThreadId -> Output ConsoleEvent -> Pipe ConsoleEvent ConsoleEvent t ()
factorTimeout v o =
do
ce <- await
liftIO . modifyMVar_ v $ relaunchTimeout o
yield ce
main :: IO ()
main =
do
hSetBuffering stdin NoBuffering
hSetEcho stdin False
exitSemaphore <- newEmptyMVar
(o1, i1) <- spawn $ bounded 1
(o2, i2) <- spawn $ bounded 1
(timeoutTrap, timeoutRender) <- withTimeout 2
runEffect $ yield CeBegan >-> toOutput o1
forkIO $ do
runEffect . forever $ chars >-> toOutput o1
putMVar exitSemaphore ()
-- other inputs would be piped to o1 here
forkIO $ do
runEffect . forever $ fromInput i1 >-> timeoutTrap >-> toOutput o2
putMVar exitSemaphore ()
forkIO $ do
runEffect . forever $ timeoutRender >-> toOutput o2
putMVar exitSemaphore ()
forkIO $ do
-- logic would be done before dumpPipe
runEffect . forever $ fromInput i2 >-> dumpPipe >-> (await >> return ())
putMVar exitSemaphore ()
takeMVar exitSemaphore
这是一个完整的可执行文件 script。
如果没有收到任何内容,withTimeout
函数假设每 s :: Int
秒通过管道 ConsoleEvent
发送一个 CeTimeout
。相反,它无法在适当的时间发送 CeTimeout
事件。如果原始事件丢失超过 s
秒,一个 CeTimeout
事件将被其他事件替换。此外,它应该是 n*s
CeTimeout
个事件,而不是一个 CeTimeout
事件,每个 s
第二个周期都计算 n
。错误在哪里,更正的地方是什么?谢谢!
withTimeout :: (MonadIO t) => Int -> Pipe ConsoleEvent ConsoleEvent t ()
withTimeout ((* 1000000) -> s) = join . liftIO $ work
where
work :: (MonadIO t) => IO (Pipe ConsoleEvent ConsoleEvent t ())
work =
do
(oSent, iKept) <- spawn $ bounded 1
(oKept, iSent) <- spawn $ unbounded
(oTimeout, iTimeout) <- spawn $ bounded 1
tid <- launchTimeout oTimeout >>= newMVar
forkIO $ do
runEffect . forever $ fromInput iKept >-> factorTimeout tid oTimeout >-> toOutput oKept
forkIO $ do
runEffect . forever $ fromInput iTimeout >-> toOutput oKept
return $ do
await >>= (liftIO . guardedSend oSent)
(liftIO . guardedRecv $ iSent) >>= yield
guardedSend :: Output ConsoleEvent -> ConsoleEvent -> IO ()
guardedSend o ce =
(atomically $ send o ce) >>= \case
True -> return ()
otherwise -> die $ "withTimeout can not send"
guardedRecv :: Input ConsoleEvent -> IO ConsoleEvent
guardedRecv i =
(atomically $ recv i) >>= \case
Just a -> return a
otherwise -> die $ "withTimeout can not recv"
launchTimeout :: Output ConsoleEvent -> IO ThreadId
launchTimeout o =
forkIO . forever $ do
threadDelay $ s
(atomically $ send o CeTimeout) >>= \case
True -> return ()
otherwise -> die "withTimeout can not send timeout"
relaunchTimeout :: Output ConsoleEvent -> ThreadId -> IO ThreadId
relaunchTimeout o oldTid =
do
tid <- launchTimeout o
killThread oldTid
return tid
factorTimeout :: MVar ThreadId -> Output ConsoleEvent -> Pipe ConsoleEvent ConsoleEvent IO ()
factorTimeout v o =
do
ce <- await
liftIO . modifyMVar_ v $ relaunchTimeout o
yield ce
这是一个完整的可执行文件 script。
似乎 Pipe
每个 await
只允许一个 yield
。这意味着 CeTimeout
不能任意地沿着管道发送,因为没有任何东西进入管道导致流动。我将不得不通过源代码来确认这一点;同时,此函数已重构为 return、Pipe
和 Producer
而不仅仅是 Pipe
。 Producer
然后可以在调用函数中重新加入。最初的计划是 return 只是一个 Pipe
,这样调用函数就不必做任何额外的工作来使超时工作。那将是一个更独立的解决方案。这个替代方案很好,因为它更明确。对于不熟悉管道的人来说,超时看起来不会像凭空出现。
withTimeout :: (MonadIO t) => Int -> IO (Pipe ConsoleEvent ConsoleEvent t (), Producer ConsoleEvent t ())
withTimeout ((* 1000000) -> s) =
do
(oTimeout, iTimeout) <- spawn $ bounded 1
vTid <- launchTimeout oTimeout >>= newMVar
return (factorTimeout vTid oTimeout, fromInput iTimeout)
where
launchTimeout :: Output ConsoleEvent -> IO ThreadId
launchTimeout o =
forkIO . forever $ do
threadDelay $ s
(atomically $ send o CeTimeout) >>= \case
True -> return ()
otherwise -> die "withTimeout can not send timeout"
relaunchTimeout :: Output ConsoleEvent -> ThreadId -> IO ThreadId
relaunchTimeout o oldTid =
do
tid <- launchTimeout o
killThread oldTid
return tid
factorTimeout :: (MonadIO t) => MVar ThreadId -> Output ConsoleEvent -> Pipe ConsoleEvent ConsoleEvent t ()
factorTimeout v o =
do
ce <- await
liftIO . modifyMVar_ v $ relaunchTimeout o
yield ce
main :: IO ()
main =
do
hSetBuffering stdin NoBuffering
hSetEcho stdin False
exitSemaphore <- newEmptyMVar
(o1, i1) <- spawn $ bounded 1
(o2, i2) <- spawn $ bounded 1
(timeoutTrap, timeoutRender) <- withTimeout 2
runEffect $ yield CeBegan >-> toOutput o1
forkIO $ do
runEffect . forever $ chars >-> toOutput o1
putMVar exitSemaphore ()
-- other inputs would be piped to o1 here
forkIO $ do
runEffect . forever $ fromInput i1 >-> timeoutTrap >-> toOutput o2
putMVar exitSemaphore ()
forkIO $ do
runEffect . forever $ timeoutRender >-> toOutput o2
putMVar exitSemaphore ()
forkIO $ do
-- logic would be done before dumpPipe
runEffect . forever $ fromInput i2 >-> dumpPipe >-> (await >> return ())
putMVar exitSemaphore ()
takeMVar exitSemaphore
这是一个完整的可执行文件 script。