使用 Control.Concurrent.STM.TBQueue 时内存泄漏
Memory leak when using Control.Concurrent.STM.TBQueue
我有两个线程:生产者和消费者。生产者产生一些(键,值)对,消费者将它们插入 Map
,包装成 Data.IORef
。我尝试使用 Control.Concurrent.BoundedChan
在生产者和消费者之间进行通信,并且它工作正常(内存使用量是恒定的),前提是我在需要的地方使用 BangPatterns。代码如下:
{-# LANGUAGE BangPatterns #-}
module Main where
import qualified Data.Map.Strict as M
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import qualified Control.Concurrent.BoundedChan as BC
import qualified Control.Concurrent.Chan.Unagi.Bounded as UBC
import qualified Data.Text as T
import System.Random
import Data.IORef
import Control.Monad
data Item = Item !Int !Int
data SinkState = SinkState {
myMap :: !(M.Map Int Int)
}
testBCs = do
chan <- BC.newBoundedChan 1000
forkIO $ source chan
sink chan
where
source chan = forever $ do
threadDelay 500
key <- getStdRandom (randomR (1,5000))
value <- getStdRandom (randomR (1,1000000))
BC.writeChan chan $ Item key value
sink chan = do
state <- newIORef SinkState {
myMap = M.empty
}
forever $ do
(Item key value) <- BC.readChan chan
atomicModifyIORef' state (\s -> (s { myMap = myMap s `seq` M.insert key value (myMap s) }, ()))
现在,当我从 BoundedChan
切换到 Control.Concurrent.STM.TBQueue
时,内存开始泄漏:
testTBs = do
chan <- atomically $ newTBQueue 1000
forkIO $ source chan
sink chan
where
source chan = forever $ do
threadDelay 500
key <- getStdRandom (randomR (1,5000))
value <- getStdRandom (randomR (1,1000000))
atomically $ writeTBQueue chan $ Item key value
sink chan = do
state <- newIORef SinkState {
myMap = M.empty
}
forever $ do
(Item key value) <- chan `seq` atomically $ readTBQueue chan
atomicModifyIORef' state (\s -> (s { myMap = myMap s `seq` M.insert key value (myMap s) }, ()))
分析结果如下所示(运行 和 +RTS -hd
):
所以我的问题是:
- 第二种情况是怎么回事?
- 那些stg_ap_2_upd_info和其他符号是什么?
- 如何修复泄漏?
在不深入研究 space 漏洞本身的情况下,一种解决方案是将消费者逻辑完全移至 STM 中。这就像用 TVar
替换 IORef
一样简单。但是为了充分利用 STM,队列读取和状态更新应该放在一个 atomically
块中。这两个操作都将在事务中执行。一个有用的副作用是我们还获得了异常安全性。
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import System.Random
import qualified Data.Map.Strict as M
data Item = Item {-# UNPACK #-} !Int {-# UNPACK #-} !Int
data SinkState = SinkState {
myMap :: !(M.Map Int Int)
}
main :: IO ()
main = do
chan <- newTBQueueIO 1000
forkIO . forever $ do
threadDelay 500
key <- getStdRandom $ randomR (1,5000)
value <- getStdRandom $ randomR (1,1000000)
atomically . writeTBQueue chan $ Item key value
state <- newTVarIO SinkState {
myMap = M.empty
}
forever . atomically $ do
Item key value <- readTBQueue chan
modifyTVar' state $ \s -> s { myMap = M.insert key value (myMap s) }
我有两个线程:生产者和消费者。生产者产生一些(键,值)对,消费者将它们插入 Map
,包装成 Data.IORef
。我尝试使用 Control.Concurrent.BoundedChan
在生产者和消费者之间进行通信,并且它工作正常(内存使用量是恒定的),前提是我在需要的地方使用 BangPatterns。代码如下:
{-# LANGUAGE BangPatterns #-}
module Main where
import qualified Data.Map.Strict as M
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import qualified Control.Concurrent.BoundedChan as BC
import qualified Control.Concurrent.Chan.Unagi.Bounded as UBC
import qualified Data.Text as T
import System.Random
import Data.IORef
import Control.Monad
data Item = Item !Int !Int
data SinkState = SinkState {
myMap :: !(M.Map Int Int)
}
testBCs = do
chan <- BC.newBoundedChan 1000
forkIO $ source chan
sink chan
where
source chan = forever $ do
threadDelay 500
key <- getStdRandom (randomR (1,5000))
value <- getStdRandom (randomR (1,1000000))
BC.writeChan chan $ Item key value
sink chan = do
state <- newIORef SinkState {
myMap = M.empty
}
forever $ do
(Item key value) <- BC.readChan chan
atomicModifyIORef' state (\s -> (s { myMap = myMap s `seq` M.insert key value (myMap s) }, ()))
现在,当我从 BoundedChan
切换到 Control.Concurrent.STM.TBQueue
时,内存开始泄漏:
testTBs = do
chan <- atomically $ newTBQueue 1000
forkIO $ source chan
sink chan
where
source chan = forever $ do
threadDelay 500
key <- getStdRandom (randomR (1,5000))
value <- getStdRandom (randomR (1,1000000))
atomically $ writeTBQueue chan $ Item key value
sink chan = do
state <- newIORef SinkState {
myMap = M.empty
}
forever $ do
(Item key value) <- chan `seq` atomically $ readTBQueue chan
atomicModifyIORef' state (\s -> (s { myMap = myMap s `seq` M.insert key value (myMap s) }, ()))
分析结果如下所示(运行 和 +RTS -hd
):
所以我的问题是:
- 第二种情况是怎么回事?
- 那些stg_ap_2_upd_info和其他符号是什么?
- 如何修复泄漏?
在不深入研究 space 漏洞本身的情况下,一种解决方案是将消费者逻辑完全移至 STM 中。这就像用 TVar
替换 IORef
一样简单。但是为了充分利用 STM,队列读取和状态更新应该放在一个 atomically
块中。这两个操作都将在事务中执行。一个有用的副作用是我们还获得了异常安全性。
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import System.Random
import qualified Data.Map.Strict as M
data Item = Item {-# UNPACK #-} !Int {-# UNPACK #-} !Int
data SinkState = SinkState {
myMap :: !(M.Map Int Int)
}
main :: IO ()
main = do
chan <- newTBQueueIO 1000
forkIO . forever $ do
threadDelay 500
key <- getStdRandom $ randomR (1,5000)
value <- getStdRandom $ randomR (1,1000000)
atomically . writeTBQueue chan $ Item key value
state <- newTVarIO SinkState {
myMap = M.empty
}
forever . atomically $ do
Item key value <- readTBQueue chan
modifyTVar' state $ \s -> s { myMap = M.insert key value (myMap s) }