使用 Control.Concurrent.STM.TBQueue 时内存泄漏

Memory leak when using Control.Concurrent.STM.TBQueue

我有两个线程:生产者和消费者。生产者产生一些(键,值)对,消费者将它们插入 Map,包装成 Data.IORef。我尝试使用 Control.Concurrent.BoundedChan 在生产者和消费者之间进行通信,并且它工作正常(内存使用量是恒定的),前提是我在需要的地方使用 BangPatterns。代码如下:

{-# LANGUAGE BangPatterns #-}

module Main where

import qualified Data.Map.Strict as M
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import qualified Control.Concurrent.BoundedChan as BC
import qualified Control.Concurrent.Chan.Unagi.Bounded as UBC
import qualified Data.Text as T
import System.Random
import Data.IORef
import Control.Monad

data Item = Item !Int !Int

data SinkState = SinkState {
  myMap :: !(M.Map Int Int)
}

testBCs = do
  chan <- BC.newBoundedChan 1000

  forkIO $ source chan
  sink chan
  where
    source chan = forever $ do
      threadDelay 500
      key <- getStdRandom (randomR (1,5000))
      value <- getStdRandom (randomR (1,1000000))
      BC.writeChan chan $ Item key value

    sink chan = do
      state <- newIORef SinkState {
          myMap = M.empty
        }
      forever $ do
        (Item key value) <- BC.readChan chan
        atomicModifyIORef' state (\s -> (s { myMap = myMap s `seq` M.insert key value (myMap s) }, ()))

现在,当我从 BoundedChan 切换到 Control.Concurrent.STM.TBQueue 时,内存开始泄漏:

testTBs = do
  chan <- atomically $ newTBQueue 1000

  forkIO $ source chan
  sink chan
  where
    source chan = forever $ do
      threadDelay 500
      key <- getStdRandom (randomR (1,5000))
      value <- getStdRandom (randomR (1,1000000))
      atomically $ writeTBQueue chan $ Item key value

    sink chan = do
      state <- newIORef SinkState {
          myMap = M.empty
        }
      forever $ do
        (Item key value) <- chan `seq` atomically $ readTBQueue chan
        atomicModifyIORef' state (\s -> (s { myMap = myMap s `seq` M.insert key value (myMap s) }, ()))

分析结果如下所示(运行 和 +RTS -hd):

所以我的问题是:

  1. 第二种情况是怎么回事?
  2. 那些stg_ap_2_upd_info和其他符号是什么?
  3. 如何修复泄漏?

在不深入研究 space 漏洞本身的情况下,一种解决方案是将消费者逻辑完全移至 STM 中。这就像用 TVar 替换 IORef 一样简单。但是为了充分利用 STM,队列读取和状态更新应该放在一个 atomically 块中。这两个操作都将在事务中执行。一个有用的副作用是我们还获得了异常安全性。

import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import System.Random

import qualified Data.Map.Strict as M

data Item = Item {-# UNPACK #-} !Int {-# UNPACK #-} !Int

data SinkState = SinkState {
  myMap :: !(M.Map Int Int)
}

main :: IO ()
main = do
  chan <- newTBQueueIO 1000

  forkIO . forever $ do
    threadDelay 500
    key <- getStdRandom $ randomR (1,5000)
    value <- getStdRandom $ randomR (1,1000000)
    atomically . writeTBQueue chan $ Item key value

  state <- newTVarIO SinkState {
      myMap = M.empty
    }

  forever . atomically $ do
    Item key value <- readTBQueue chan
    modifyTVar' state $ \s -> s { myMap = M.insert key value (myMap s) }