Control.Concurrent.MonadIO 的管道和叉子

pipes and fork with Control.Concurrent.MonadIO

在下面的代码中,我试图将 2 个 Producer 合并为 1 个。它们都具有相同的类型。它们将由 2 个输入 Producer 中的每一个 运行 组合在一个单独的线程中,并被 Consumer 消耗,将值放入 unagi chan(我正在使用 unagi陈的表现)。返回一个从 chan 读取的生产者。我希望能够使用任何 Producer,只要它可以 运行 IO 所以我将 monad 的 class 限制为 MonadIOHasFork.

import Pipes (Producer, Consumer, (>->), yield, await, runEffect)
import Control.Concurrent.Chan.Unagi (InChan, OutChan, newChan, readChan, writeChan)
import Control.Monad (forever, void)
import Control.Concurrent.MonadIO (MonadIO, HasFork, liftIO, fork)

combine :: (MonadIO m, HasFork m) => Producer a m r -> Producer a m r -> Producer a m r
combine p1 p2 = do
  (inChan, outChan) <- liftIO $ newChan
  t1 <- fork . void . runEffect $ p1 >-> (consumer inChan)
  t2 <- fork . void . runEffect $ p2 >-> (consumer inChan)
  producer outChan

producer :: (MonadIO m) => OutChan a -> Producer a m r
producer outChan = forever $ do
  msg <- liftIO $ readChan outChan
  yield msg

consumer :: (MonadIO m) => InChan a -> Consumer a m r
consumer inChan = forever $ do
  msg <- await
  liftIO $ writeChan inChan msg

但是我收到以下错误:

    • Couldn't match type ‘m’
                 with ‘Pipes.Internal.Proxy Pipes.Internal.X () () a m’
  ‘m’ is a rigid type variable bound by
    the type signature for:
      combine :: forall (m :: * -> *) a r.
                 (MonadIO m, HasFork m) =>
                 Producer a m r -> Producer a m r -> Producer a m r
    at src/Pipes/Unagi.hs:8:12
  Expected type: Pipes.Internal.Proxy
                   Pipes.Internal.X () () a m GHC.Conc.Sync.ThreadId
    Actual type: m GHC.Conc.Sync.ThreadId

我想做的事情可行吗?

fork前加上lift

t1 <- lift . fork . ...

在你的函数中,fork . ... 的类型为 m ThreadId,但是 do 块在 monad Producer a m.

另外,lifted-base 有更新的 fork