使用 TChan 进行类似管道的操作
Pipeline-like operation using TChan
我想在两个线程之间实现一个管道。我有线程 A 获取数据、处理数据并将其发送到线程 B。我有一个 MVar 检查数据是否已完全处理
但是,我有一个例外*** Exception: thread blocked indefinitely in an STM transaction
为什么我的线程被阻塞了?我虽然比当第一个线程在通道上写入时,当通道上有数据时,第二个线程可以读取它
fstPipe :: (a -> b) -> TChan b -> MVar () -> [a] -> IO ()
fstPipe f chIn m xs = do
( mapM_(\x-> atomically $ writeTChan chIn $ f x) xs) >> putMVar m ()
pipelineDone channel mIn = do
isDone <- fmap isJust $ tryTakeMVar mIn
isEmpty <- atomically $ isEmptyTChan channel
return $ isDone && isEmpty
lastPipe f chIn mIn = iter
where iter = do
atomically $ fmap f $ readTChan chIn
isDone <- pipelineDone chIn mIn
unless isDone $ iter
pipeline = do
chIn <- atomically newTChan
m <- newEmptyMVar
first <- async $ fstPipe reverse chIn m $ replicate 10 [1..500]
last <- async $ lastPipe print chIn m
wait first
wait last
我认为存在竞争条件:
- 在
putMVar
之前停止 fstPipe
- 提前
lastPipe
阅读所有内容,然后调用pipelineDone
pipelineDone
returns False
因为 putMVar
还没有完成
lastPipe
将尝试从频道 读取
putMVar
执行,但为时已晚
现在 lastPipe
卡在空频道上阅读。
在同一个代码块中使用 STM 和 信号量对我来说似乎很奇怪......为什么不在 STM 中完成整个事情?
特别是,为什么不用 TChan (Maybe x)
,Nothing
表示序列的结尾?
另外,请注意您的 fstPipe
可能只是生成一堆未计算的 thunk 并立即将它们放入 TChan
,而没有实际计算任何东西。你可能想要一个 seq
或类似的东西来强制一些实际的 work 在那个线程上发生。
你的问题出在pipelineDone
的逻辑上。目前,您有:
pipelineDone channel mIn = do
isDone <- fmap isJust $ tryTakeMVar mIn
isEmpty <- atomically $ isEmptyTChan channel
return $ isDone && isEmpty
tryTakeMVar
将获取 MVar 的内容,假设其中有内容。假设您的生产者首先完成,它将 ()
写入 MVar。然后您的消费者将尝试获取其中的内容。如果成功,则 MVar 变为空。任何后续的 tryTakeMVar
将始终 return Nothing
,因此 isDone && isEmpty
将始终 return false,您将继续尝试从 TChan
中读取。一旦 TChan
变为空,GHC 会告诉您它遇到了死锁。
您应该改为将 pipelineDone 实现更改为:
pipelineDone channel mIn = do
stillRunning <- isEmptyMVar mIn
isEmpty <- atomically $ isEmptyTChan channel
return $ (not stillRunning) && isEmpty
这将改为简单地轮询 MVar,而不是实际清空它。
我想在两个线程之间实现一个管道。我有线程 A 获取数据、处理数据并将其发送到线程 B。我有一个 MVar 检查数据是否已完全处理
但是,我有一个例外*** Exception: thread blocked indefinitely in an STM transaction
为什么我的线程被阻塞了?我虽然比当第一个线程在通道上写入时,当通道上有数据时,第二个线程可以读取它
fstPipe :: (a -> b) -> TChan b -> MVar () -> [a] -> IO ()
fstPipe f chIn m xs = do
( mapM_(\x-> atomically $ writeTChan chIn $ f x) xs) >> putMVar m ()
pipelineDone channel mIn = do
isDone <- fmap isJust $ tryTakeMVar mIn
isEmpty <- atomically $ isEmptyTChan channel
return $ isDone && isEmpty
lastPipe f chIn mIn = iter
where iter = do
atomically $ fmap f $ readTChan chIn
isDone <- pipelineDone chIn mIn
unless isDone $ iter
pipeline = do
chIn <- atomically newTChan
m <- newEmptyMVar
first <- async $ fstPipe reverse chIn m $ replicate 10 [1..500]
last <- async $ lastPipe print chIn m
wait first
wait last
我认为存在竞争条件:
- 在
putMVar
之前停止 - 提前
lastPipe
阅读所有内容,然后调用pipelineDone
pipelineDone
returnsFalse
因为putMVar
还没有完成lastPipe
将尝试从频道 读取
putMVar
执行,但为时已晚
fstPipe
现在 lastPipe
卡在空频道上阅读。
在同一个代码块中使用 STM 和 信号量对我来说似乎很奇怪......为什么不在 STM 中完成整个事情?
特别是,为什么不用 TChan (Maybe x)
,Nothing
表示序列的结尾?
另外,请注意您的 fstPipe
可能只是生成一堆未计算的 thunk 并立即将它们放入 TChan
,而没有实际计算任何东西。你可能想要一个 seq
或类似的东西来强制一些实际的 work 在那个线程上发生。
你的问题出在pipelineDone
的逻辑上。目前,您有:
pipelineDone channel mIn = do
isDone <- fmap isJust $ tryTakeMVar mIn
isEmpty <- atomically $ isEmptyTChan channel
return $ isDone && isEmpty
tryTakeMVar
将获取 MVar 的内容,假设其中有内容。假设您的生产者首先完成,它将 ()
写入 MVar。然后您的消费者将尝试获取其中的内容。如果成功,则 MVar 变为空。任何后续的 tryTakeMVar
将始终 return Nothing
,因此 isDone && isEmpty
将始终 return false,您将继续尝试从 TChan
中读取。一旦 TChan
变为空,GHC 会告诉您它遇到了死锁。
您应该改为将 pipelineDone 实现更改为:
pipelineDone channel mIn = do
stillRunning <- isEmptyMVar mIn
isEmpty <- atomically $ isEmptyTChan channel
return $ (not stillRunning) && isEmpty
这将改为简单地轮询 MVar,而不是实际清空它。