为什么在添加 bracketOnError 时此代码内存泄漏?

Why does this code memory leak when adding `bracketOnError`?

首先,我很抱歉没有一个最小的例子(我可以尝试构建一个,但现在我有一个“之前和之后”的例子):

首先是发生内存泄漏的“after”:

protoReceiver :: RIO FdsEnv ()
protoReceiver = do
  logItS Info ["Entering FarmPCMessage protoReceiver"]
  tMap <- liftIO $ newThreadMap
  fdsEnv <- ask
  let lgr = fdsLogger fdsEnv
  loopBody <- pure $ bracketOnError
    (runResourceT $ protoServe fdsEnv tMap readFarmPCMessage)
    (\(_,w) -> do
      logLogItS Debug lgr ["Entering cleanup for protoReceiver"]
      )
    (\(server,_) -> do
      logLogItS Debug lgr ["Entering FarmPCMessage protoReceiver bracket"]
      server
        .| mapMC (liftIO . traverse_ (persistFarmEntry fdsEnv))
        .| mapMC ((logLogIt Info lgr) . pure)
        .| sinkUnits & runConduitRes
      )
  liftIO loopBody

这是没有内存泄漏的“之前”代码:

protoReceiver :: RIO FdsEnv ()
protoReceiver = do
  logItS Info ["Entering FarmPCMessage protoReceiver"]
  tMap <- liftIO $ newThreadMap
  fdsEnv <- ask
  let lgr = fdsLogger fdsEnv

  (dmgrProtoServe, tcpWorker) <- liftIO $ runResourceT
    $ protoServe fdsEnv tMap readFarmPCMessage
  liftIO $ runResourceT $ dmgrProtoServe
    .| mapMC (liftIO . traverse_ (persistFarmEntry fdsEnv))
    .| mapMC ((logLogIt Info lgr) . pure)
    .| sinkUnits & runConduit

我对泄漏进行了一些分析,但我不确定它是否特别有用(感谢任何关于更好的分析图表的建议):

问题是经典泄漏场景的一种变体,在这种情况下,我们在使用惰性列表时保留对头部的引用:

import Data.Foldable (traverse_)

main :: IO ()
main = do
    let xs = [1..]
    traverse_ print xs 
    traverse_ print xs -- commenting this statement solves the leak 

在这里,管道 Source 就像一个“惰性列表”。我们需要保留对原始源值 (server) 的引用,即使我们使用它也是如此,因为在发生错误时必须将其传递给异常处理程序。然而异常处理程序似乎并没有使用它。

解决方案是在我们传递给 bracketOnError 的主要计算获得该值后立即切断该引用。为此,我们可以使用 MVar。不是因为它的同步能力,而是因为它是一个可以“留空”的可变引用。

分配操作可以 return 一个 (MVar (Source m r), a) 值,而不是 return 一个 (Source m r, a) 值。然后,主要计算将执行 takeMVar 以获取管道源。一旦我们开始使用源,原始值将被垃圾收集,因为不会再保留对它的任何引用。

这是 OP 在遵循这些建议后使用的工作代码:

protoReceiver :: RIO FdsEnv ()
protoReceiver = retryForever $ do
  logItS Info ["Entering FarmPCMessage protoReceiver"]
  tMap <- liftIO $ newThreadMap
  fdsEnv <- ask
  let lgr = fdsLogger fdsEnv
  loopBody <- pure $ bracket
    (runResourceT $ do
      swTup <- protoServe fdsEnv tMap readFarmPCMessage
      serverMVar <- newMVar $ fst swTup
      pure (serverMVar, snd $! swTup)
      )
    (\(_, worker) -> do
      logLogItS Debug lgr ["Entering cleanup for protoReceiver"]
      killChildThreads tMap
      cancel worker
      )
    (\(serverMVar, _) -> do
      logLogItS Debug lgr ["Entering FarmPCMessage protoReceiver bracket"]
      server <- takeMVar serverMVar
      logLogItS Debug lgr ["FarmPCMessage protoReceiver bracket: got server"]
      server
        .| mapMC (liftIO . traverse_ (persistFarmEntry fdsEnv))
        .| mapMC ((logLogIt Info lgr) . pure)
        .| sinkUnits & runConduitRes
      )
  liftIO $ retryForever $ loopBody
  where
    killChildThreads = liftIO . killThreadHierarchy