我将如何使用每次传入重置的超时进行管道传输?

How would I pipe with a timeout that resets with each incoming?

如果没有收到任何内容,withTimeout 函数假设每 s :: Int 秒通过管道 ConsoleEvent 发送一个 CeTimeout。相反,它无法在适当的时间发送 CeTimeout 事件。如果原始事件丢失超过 s 秒,一个 CeTimeout 事件将被其他事件替换。此外,它应该是 n*s CeTimeout 个事件,而不是一个 CeTimeout 事件,每个 s 第二个周期都计算 n 。错误在哪里,更正的地方是什么?谢谢!

withTimeout :: (MonadIO t) => Int -> Pipe ConsoleEvent ConsoleEvent t ()
withTimeout ((* 1000000) -> s) = join . liftIO $ work
  where
    work :: (MonadIO t) => IO (Pipe ConsoleEvent ConsoleEvent t ()) 
    work =
      do
        (oSent, iKept) <- spawn $ bounded 1
        (oKept, iSent) <- spawn $ unbounded
        (oTimeout, iTimeout) <- spawn $ bounded 1

        tid <- launchTimeout oTimeout >>= newMVar

        forkIO $ do
          runEffect . forever $ fromInput iKept >-> factorTimeout tid oTimeout >-> toOutput oKept

        forkIO $ do
          runEffect . forever $ fromInput iTimeout >-> toOutput oKept

        return $ do
          await >>= (liftIO . guardedSend oSent)
          (liftIO . guardedRecv $ iSent) >>= yield

    guardedSend :: Output ConsoleEvent -> ConsoleEvent -> IO ()
    guardedSend o ce =
      (atomically $ send o ce) >>= \case
        True -> return ()
        otherwise -> die $ "withTimeout can not send"

    guardedRecv :: Input ConsoleEvent -> IO ConsoleEvent
    guardedRecv i =
      (atomically $ recv i) >>= \case
        Just a -> return a
        otherwise -> die $ "withTimeout can not recv"

    launchTimeout :: Output ConsoleEvent -> IO ThreadId
    launchTimeout o =
      forkIO . forever $ do
        threadDelay $ s
        (atomically $ send o CeTimeout) >>= \case
          True -> return ()
          otherwise -> die "withTimeout can not send timeout"

    relaunchTimeout :: Output ConsoleEvent -> ThreadId -> IO ThreadId
    relaunchTimeout o oldTid = 
      do
        tid <- launchTimeout o
        killThread oldTid
        return tid

    factorTimeout :: MVar ThreadId -> Output ConsoleEvent -> Pipe ConsoleEvent ConsoleEvent IO ()
    factorTimeout v o =
      do
        ce <- await
        liftIO . modifyMVar_ v $ relaunchTimeout o
        yield ce

这是一个完整的可执行文件 script

似乎 Pipe 每个 await 只允许一个 yield。这意味着 CeTimeout 不能任意地沿着管道发送,因为没有任何东西进入管道导致流动。我将不得不通过源代码来确认这一点;同时,此函数已重构为 return、PipeProducer 而不仅仅是 PipeProducer 然后可以在调用函数中重新加入。最初的计划是 return 只是一个 Pipe,这样调用函数就不必做任何额外的工作来使超时工作。那将是一个更独立的解决方案。这个替代方案很好,因为它更明确。对于不熟悉管道的人来说,超时看起来不会像凭空出现。

withTimeout :: (MonadIO t) => Int -> IO (Pipe ConsoleEvent ConsoleEvent t (), Producer ConsoleEvent t ())
withTimeout ((* 1000000) -> s) =
  do
    (oTimeout, iTimeout) <- spawn $ bounded 1
    vTid <- launchTimeout oTimeout >>= newMVar

    return (factorTimeout vTid oTimeout, fromInput iTimeout)
  where
    launchTimeout :: Output ConsoleEvent -> IO ThreadId
    launchTimeout o =
      forkIO . forever $ do
        threadDelay $ s
        (atomically $ send o CeTimeout) >>= \case
          True -> return ()
          otherwise -> die "withTimeout can not send timeout"

    relaunchTimeout :: Output ConsoleEvent -> ThreadId -> IO ThreadId
    relaunchTimeout o oldTid = 
      do
        tid <- launchTimeout o
        killThread oldTid
        return tid

    factorTimeout :: (MonadIO t) => MVar ThreadId -> Output ConsoleEvent -> Pipe ConsoleEvent ConsoleEvent t ()
    factorTimeout v o =
      do
        ce <- await
        liftIO . modifyMVar_ v $ relaunchTimeout o
        yield ce

main :: IO ()
main =
  do
    hSetBuffering stdin NoBuffering
    hSetEcho stdin False

    exitSemaphore <- newEmptyMVar
    (o1, i1) <- spawn $ bounded 1
    (o2, i2) <- spawn $ bounded 1

    (timeoutTrap, timeoutRender) <- withTimeout 2

    runEffect $ yield CeBegan >-> toOutput o1

    forkIO $ do
      runEffect . forever $ chars >-> toOutput o1
      putMVar exitSemaphore ()

    -- other inputs would be piped to o1 here

    forkIO $ do
      runEffect . forever $ fromInput i1 >-> timeoutTrap >-> toOutput o2
      putMVar exitSemaphore ()

    forkIO $ do
      runEffect . forever $ timeoutRender >-> toOutput o2
      putMVar exitSemaphore ()

    forkIO $ do
      -- logic would be done before dumpPipe
      runEffect . forever $ fromInput i2 >-> dumpPipe >-> (await >> return ())
      putMVar exitSemaphore ()

    takeMVar exitSemaphore

这是一个完整的可执行文件 script