为什么在添加 bracketOnError 时此代码内存泄漏?
Why does this code memory leak when adding `bracketOnError`?
首先,我很抱歉没有一个最小的例子(我可以尝试构建一个,但现在我有一个“之前和之后”的例子):
首先是发生内存泄漏的“after”:
protoReceiver :: RIO FdsEnv ()
protoReceiver = do
logItS Info ["Entering FarmPCMessage protoReceiver"]
tMap <- liftIO $ newThreadMap
fdsEnv <- ask
let lgr = fdsLogger fdsEnv
loopBody <- pure $ bracketOnError
(runResourceT $ protoServe fdsEnv tMap readFarmPCMessage)
(\(_,w) -> do
logLogItS Debug lgr ["Entering cleanup for protoReceiver"]
)
(\(server,_) -> do
logLogItS Debug lgr ["Entering FarmPCMessage protoReceiver bracket"]
server
.| mapMC (liftIO . traverse_ (persistFarmEntry fdsEnv))
.| mapMC ((logLogIt Info lgr) . pure)
.| sinkUnits & runConduitRes
)
liftIO loopBody
这是没有内存泄漏的“之前”代码:
protoReceiver :: RIO FdsEnv ()
protoReceiver = do
logItS Info ["Entering FarmPCMessage protoReceiver"]
tMap <- liftIO $ newThreadMap
fdsEnv <- ask
let lgr = fdsLogger fdsEnv
(dmgrProtoServe, tcpWorker) <- liftIO $ runResourceT
$ protoServe fdsEnv tMap readFarmPCMessage
liftIO $ runResourceT $ dmgrProtoServe
.| mapMC (liftIO . traverse_ (persistFarmEntry fdsEnv))
.| mapMC ((logLogIt Info lgr) . pure)
.| sinkUnits & runConduit
我对泄漏进行了一些分析,但我不确定它是否特别有用(感谢任何关于更好的分析图表的建议):
问题是经典泄漏场景的一种变体,在这种情况下,我们在使用惰性列表时保留对头部的引用:
import Data.Foldable (traverse_)
main :: IO ()
main = do
let xs = [1..]
traverse_ print xs
traverse_ print xs -- commenting this statement solves the leak
在这里,管道 Source
就像一个“惰性列表”。我们需要保留对原始源值 (server
) 的引用,即使我们使用它也是如此,因为在发生错误时必须将其传递给异常处理程序。然而异常处理程序似乎并没有使用它。
解决方案是在我们传递给 bracketOnError
的主要计算获得该值后立即切断该引用。为此,我们可以使用 MVar
。不是因为它的同步能力,而是因为它是一个可以“留空”的可变引用。
分配操作可以 return 一个 (MVar (Source m r), a)
值,而不是 return 一个 (Source m r, a)
值。然后,主要计算将执行 takeMVar
以获取管道源。一旦我们开始使用源,原始值将被垃圾收集,因为不会再保留对它的任何引用。
这是 OP 在遵循这些建议后使用的工作代码:
protoReceiver :: RIO FdsEnv ()
protoReceiver = retryForever $ do
logItS Info ["Entering FarmPCMessage protoReceiver"]
tMap <- liftIO $ newThreadMap
fdsEnv <- ask
let lgr = fdsLogger fdsEnv
loopBody <- pure $ bracket
(runResourceT $ do
swTup <- protoServe fdsEnv tMap readFarmPCMessage
serverMVar <- newMVar $ fst swTup
pure (serverMVar, snd $! swTup)
)
(\(_, worker) -> do
logLogItS Debug lgr ["Entering cleanup for protoReceiver"]
killChildThreads tMap
cancel worker
)
(\(serverMVar, _) -> do
logLogItS Debug lgr ["Entering FarmPCMessage protoReceiver bracket"]
server <- takeMVar serverMVar
logLogItS Debug lgr ["FarmPCMessage protoReceiver bracket: got server"]
server
.| mapMC (liftIO . traverse_ (persistFarmEntry fdsEnv))
.| mapMC ((logLogIt Info lgr) . pure)
.| sinkUnits & runConduitRes
)
liftIO $ retryForever $ loopBody
where
killChildThreads = liftIO . killThreadHierarchy
首先,我很抱歉没有一个最小的例子(我可以尝试构建一个,但现在我有一个“之前和之后”的例子):
首先是发生内存泄漏的“after”:
protoReceiver :: RIO FdsEnv ()
protoReceiver = do
logItS Info ["Entering FarmPCMessage protoReceiver"]
tMap <- liftIO $ newThreadMap
fdsEnv <- ask
let lgr = fdsLogger fdsEnv
loopBody <- pure $ bracketOnError
(runResourceT $ protoServe fdsEnv tMap readFarmPCMessage)
(\(_,w) -> do
logLogItS Debug lgr ["Entering cleanup for protoReceiver"]
)
(\(server,_) -> do
logLogItS Debug lgr ["Entering FarmPCMessage protoReceiver bracket"]
server
.| mapMC (liftIO . traverse_ (persistFarmEntry fdsEnv))
.| mapMC ((logLogIt Info lgr) . pure)
.| sinkUnits & runConduitRes
)
liftIO loopBody
这是没有内存泄漏的“之前”代码:
protoReceiver :: RIO FdsEnv ()
protoReceiver = do
logItS Info ["Entering FarmPCMessage protoReceiver"]
tMap <- liftIO $ newThreadMap
fdsEnv <- ask
let lgr = fdsLogger fdsEnv
(dmgrProtoServe, tcpWorker) <- liftIO $ runResourceT
$ protoServe fdsEnv tMap readFarmPCMessage
liftIO $ runResourceT $ dmgrProtoServe
.| mapMC (liftIO . traverse_ (persistFarmEntry fdsEnv))
.| mapMC ((logLogIt Info lgr) . pure)
.| sinkUnits & runConduit
我对泄漏进行了一些分析,但我不确定它是否特别有用(感谢任何关于更好的分析图表的建议):
问题是经典泄漏场景的一种变体,在这种情况下,我们在使用惰性列表时保留对头部的引用:
import Data.Foldable (traverse_)
main :: IO ()
main = do
let xs = [1..]
traverse_ print xs
traverse_ print xs -- commenting this statement solves the leak
在这里,管道 Source
就像一个“惰性列表”。我们需要保留对原始源值 (server
) 的引用,即使我们使用它也是如此,因为在发生错误时必须将其传递给异常处理程序。然而异常处理程序似乎并没有使用它。
解决方案是在我们传递给 bracketOnError
的主要计算获得该值后立即切断该引用。为此,我们可以使用 MVar
。不是因为它的同步能力,而是因为它是一个可以“留空”的可变引用。
分配操作可以 return 一个 (MVar (Source m r), a)
值,而不是 return 一个 (Source m r, a)
值。然后,主要计算将执行 takeMVar
以获取管道源。一旦我们开始使用源,原始值将被垃圾收集,因为不会再保留对它的任何引用。
这是 OP 在遵循这些建议后使用的工作代码:
protoReceiver :: RIO FdsEnv ()
protoReceiver = retryForever $ do
logItS Info ["Entering FarmPCMessage protoReceiver"]
tMap <- liftIO $ newThreadMap
fdsEnv <- ask
let lgr = fdsLogger fdsEnv
loopBody <- pure $ bracket
(runResourceT $ do
swTup <- protoServe fdsEnv tMap readFarmPCMessage
serverMVar <- newMVar $ fst swTup
pure (serverMVar, snd $! swTup)
)
(\(_, worker) -> do
logLogItS Debug lgr ["Entering cleanup for protoReceiver"]
killChildThreads tMap
cancel worker
)
(\(serverMVar, _) -> do
logLogItS Debug lgr ["Entering FarmPCMessage protoReceiver bracket"]
server <- takeMVar serverMVar
logLogItS Debug lgr ["FarmPCMessage protoReceiver bracket: got server"]
server
.| mapMC (liftIO . traverse_ (persistFarmEntry fdsEnv))
.| mapMC ((logLogIt Info lgr) . pure)
.| sinkUnits & runConduitRes
)
liftIO $ retryForever $ loopBody
where
killChildThreads = liftIO . killThreadHierarchy