使用 TChan 进行类似管道的操作

Pipeline-like operation using TChan

我想在两个线程之间实现一个管道。我有线程 A 获取数据、处理数据并将其发送到线程 B。我有一个 MVar 检查数据是否已完全处理

但是,我有一个例外*** Exception: thread blocked indefinitely in an STM transaction

为什么我的线程被阻塞了?我虽然比当第一个线程在通道上写入时,当通道上有数据时,第二个线程可以读取它

fstPipe :: (a -> b) -> TChan b -> MVar () -> [a] -> IO ()
fstPipe f chIn m xs = do
    ( mapM_(\x-> atomically $ writeTChan chIn $ f x) xs) >> putMVar m ()

pipelineDone channel mIn = do
    isDone <- fmap isJust $ tryTakeMVar mIn
    isEmpty <- atomically $ isEmptyTChan channel
    return $ isDone && isEmpty

lastPipe f chIn mIn = iter 
    where iter = do
        atomically $ fmap f $ readTChan chIn
        isDone <- pipelineDone chIn mIn
        unless isDone $ iter

pipeline = do
    chIn <- atomically newTChan
    m <- newEmptyMVar
    first <- async $ fstPipe reverse chIn m $ replicate 10 [1..500]
    last <- async $ lastPipe print chIn m
    wait first
    wait last

我认为存在竞争条件:

  • putMVar
  • 之前停止 fstPipe
  • 提前lastPipe阅读所有内容,然后调用pipelineDone
  • pipelineDone returns False 因为 putMVar 还没有完成
  • lastPipe 将尝试从频道
  • 读取
  • putMVar 执行,但为时已晚

现在 lastPipe 卡在空频道上阅读。

在同一个代码块中使用 STM 信号量对我来说似乎很奇怪......为什么不在 STM 中完成整个事情?

特别是,为什么不用 TChan (Maybe x)Nothing 表示序列的结尾?

另外,请注意您的 fstPipe 可能只是生成一堆未计算的 thunk 并立即将它们放入 TChan,而没有实际计算任何东西。你可能想要一个 seq 或类似的东西来强制一些实际的 work 在那个线程上发生。

你的问题出在pipelineDone的逻辑上。目前,您有:

pipelineDone channel mIn = do
  isDone <- fmap isJust $ tryTakeMVar mIn
  isEmpty <- atomically $ isEmptyTChan channel
  return $ isDone && isEmpty

tryTakeMVar 将获取 MVar 的内容,假设其中有内容。假设您的生产者首先完成,它将 () 写入 MVar。然后您的消费者将尝试获取其中的内容。如果成功,则 MVar 变为空。任何后续的 tryTakeMVar 将始终 return Nothing,因此 isDone && isEmpty 将始终 return false,您将继续尝试从 TChan 中读取。一旦 TChan 变为空,GHC 会告诉您它遇到了死锁。

您应该改为将 pipelineDone 实现更改为:

pipelineDone channel mIn = do
  stillRunning <- isEmptyMVar mIn
  isEmpty <- atomically $ isEmptyTChan channel
  return $ (not stillRunning) && isEmpty

这将改为简单地轮询 MVar,而不是实际清空它。