减少 Haskell 程序中的垃圾收集暂停时间
Reducing garbage-collection pause time in a Haskell program
我们正在开发一个接收和转发 "messages" 的程序,同时保留这些消息的临时历史记录,以便它可以在需要时告诉您消息历史记录。消息以数字方式标识,通常大小约为 1 KB,我们需要保留数十万条此类消息。
我们希望针对延迟优化此程序:发送和接收消息之间的时间必须低于 10 毫秒。
程序是用Haskell编写的,用GHC编译的。但是,我们发现垃圾收集暂停对于我们的延迟要求来说太长了:在我们的真实程序中超过 100 毫秒。
以下程序是我们应用程序的简化版本。它使用 Data.Map.Strict
来存储消息。消息是由 Int
标识的 ByteString
。以递增的数字顺序插入 1,000,000 条消息,并不断删除最旧的消息以将历史记录保持在最多 200,000 条消息。
module Main (main) where
import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import qualified Data.Map.Strict as Map
data Msg = Msg !Int !ByteString.ByteString
type Chan = Map.Map Int ByteString.ByteString
message :: Int -> Msg
message n = Msg n (ByteString.replicate 1024 (fromIntegral n))
pushMsg :: Chan -> Msg -> IO Chan
pushMsg chan (Msg msgId msgContent) =
Exception.evaluate $
let
inserted = Map.insert msgId msgContent chan
in
if 200000 < Map.size inserted
then Map.deleteMin inserted
else inserted
main :: IO ()
main = Monad.foldM_ pushMsg Map.empty (map message [1..1000000])
我们编译并运行这个程序使用:
$ ghc --version
The Glorious Glasgow Haskell Compilation System, version 7.10.3
$ ghc -O2 -optc-O3 Main.hs
$ ./Main +RTS -s
3,116,460,096 bytes allocated in the heap
385,101,600 bytes copied during GC
235,234,800 bytes maximum residency (14 sample(s))
124,137,808 bytes maximum slop
600 MB total memory in use (0 MB lost due to fragmentation)
Tot time (elapsed) Avg pause Max pause
Gen 0 6558 colls, 0 par 0.238s 0.280s 0.0000s 0.0012s
Gen 1 14 colls, 0 par 0.179s 0.250s 0.0179s 0.0515s
INIT time 0.000s ( 0.000s elapsed)
MUT time 0.652s ( 0.745s elapsed)
GC time 0.417s ( 0.530s elapsed)
EXIT time 0.010s ( 0.052s elapsed)
Total time 1.079s ( 1.326s elapsed)
%GC time 38.6% (40.0% elapsed)
Alloc rate 4,780,213,353 bytes per MUT second
Productivity 61.4% of total user, 49.9% of total elapsed
此处的重要指标是 "max pause" 0.0515 秒,即 51 毫秒。我们希望将其减少至少一个数量级。
实验表明,GC暂停的时间长度由历史消息的数量决定。这种关系大致是线性的,或者可能是超线性的。下面的 table 显示了这种关系。 (You can see our benchmarking tests here, and some charts here.)
msgs history length max GC pause (ms)
=================== =================
12500 3
25000 6
50000 13
100000 30
200000 56
400000 104
800000 199
1600000 487
3200000 1957
6400000 5378
我们已经对其他几个变量进行了实验,以确定它们是否可以减少这种延迟,none 其中有很大的不同。这些不重要的变量包括:优化(-O
、-O2
); RTS GC 选项(-G
、-H
、-A
、-c
)、核心数(-N
)、不同的数据结构(Data.Sequence
) 、消息的大小以及生成的短期垃圾的数量。压倒性的决定因素是历史消息的数量。
我们的工作理论是停顿与消息数量呈线性关系,因为每个 GC 周期都必须遍历所有工作的可访问内存并复制它,这显然是线性操作。
问题:
- 这个线性时间理论正确吗? GC 暂停的长度可以用这种简单的方式表示吗,还是现实更复杂?
- 如果GC停顿在工作内存中是线性的,有没有办法减少涉及的常数因子?
- 是否有增量 GC 或类似的选项?我们只能看到研究论文。我们非常愿意用吞吐量来换取更低的延迟。
- 除了拆分成多个进程之外,还有什么方法可以 "partition" 内存用于更小的 GC 周期?
你实际上做得很好,有 51 毫秒的暂停时间和超过 200Mb 的实时数据。我工作的系统有一个更大的最大暂停时间和一半的实时数据。
您的假设是正确的,主要的 GC 暂停时间与实时数据量成正比,不幸的是,目前的 GHC 无法解决这个问题。我们过去曾尝试过增量 GC,但这是一个研究项目,没有达到将其纳入已发布的 GHC 所需的成熟度。
我们希望将来能对此有所帮助的一件事是紧凑区域:https://phabricator.haskell.org/D1264。这是一种手动内存管理,您可以在堆中压缩一个结构,而 GC 不必遍历它。它最适合长期存在的数据,但也许它足以用于您设置中的单个消息。我们的目标是在 GHC 8.2.0 中使用它。
如果您处于分布式环境中并且拥有某种负载均衡器,则可以使用一些技巧来避免暂停命中,您基本上可以确保负载均衡器不会将请求发送到即将进行主要 GC 的机器,当然要确保机器仍然完成 GC,即使它没有收到请求。
嗯,您发现了具有 GC 的语言的局限性:它们不适合硬核实时系统。
您有 2 个选择:
1st 增加堆大小并使用 2 级缓存系统,将最旧的消息发送到磁盘并将最新的消息保留在内存中,您可以通过使用 OS 分页来实现。尽管使用此解决方案,但问题是分页可能会很昂贵,具体取决于所使用的辅助内存单元的读取能力。
第二个使用 'C' 解决方案的程序,并将其与 FFI 连接到 haskell。这样你就可以进行自己的内存管理。这将是最好的选择,因为您可以自己控制所需的内存。
我必须同意其他人的意见 - 如果您有严格的实时约束,那么使用 GC 语言并不理想。
但是,您可以考虑尝试其他可用的数据结构,而不仅仅是 Data.Map。
我使用 Data.Sequence 重写了它并获得了一些有希望的改进:
msgs history length max GC pause (ms)
=================== =================
12500 0.7
25000 1.4
50000 2.8
100000 5.4
200000 10.9
400000 21.8
800000 46
1600000 87
3200000 175
6400000 350
尽管您针对延迟进行了优化,但我注意到其他指标也有所改善。在 200000 的情况下,执行时间从 1.5s 下降到 0.2s,总内存使用量从 600MB 下降到 27MB。
请注意,我通过调整设计作弊:
- 我从
Msg
中删除了 Int
,所以它不在两个地方。
- 我没有使用从
Int
s 到 ByteString
s 的映射,而是使用了 Sequence
的 ByteString
s,而不是一个 Int
每条消息,我认为可以用一个 Int
来完成整个 Sequence
。假设消息无法重新排序,您可以使用单个偏移量将您想要的消息转换到它在队列中的位置。
(我添加了一个附加函数 getMsg
来证明这一点。)
{-# LANGUAGE BangPatterns #-}
import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import Data.Sequence as S
newtype Msg = Msg ByteString.ByteString
data Chan = Chan Int (Seq ByteString.ByteString)
message :: Int -> Msg
message n = Msg (ByteString.replicate 1024 (fromIntegral n))
maxSize :: Int
maxSize = 200000
pushMsg :: Chan -> Msg -> IO Chan
pushMsg (Chan !offset sq) (Msg msgContent) =
Exception.evaluate $
let newSize = 1 + S.length sq
newSq = sq |> msgContent
in
if newSize <= maxSize
then Chan offset newSq
else
case S.viewl newSq of
(_ :< newSq') -> Chan (offset+1) newSq'
S.EmptyL -> error "Can't happen"
getMsg :: Chan -> Int -> Maybe Msg
getMsg (Chan offset sq) i_ = getMsg' (i_ - offset)
where
getMsg' i
| i < 0 = Nothing
| i >= S.length sq = Nothing
| otherwise = Just (Msg (S.index sq i))
main :: IO ()
main = Monad.foldM_ pushMsg (Chan 0 S.empty) (map message [1..5 * maxSize])
我已经使用 IOVector
作为基础数据结构,通过环形缓冲区方法尝试了您的代码片段。在我的系统(GHC 7.10.3,相同的编译选项)上,这导致最大时间(您在 OP 中提到的指标)减少了 ~22%。
注意。我在这里做了两个假设:
- 可变数据结构很适合解决这个问题(我想消息传递无论如何都意味着 IO)
- 您的 messageId 是连续的
通过一些额外的 Int
参数和算法(比如当 messageId 重置为 0 或 minBound
时),应该可以直接确定某个消息是否仍在历史记录中并检索它在环形缓冲区中形成相应的索引。
为了您的测试乐趣:
import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import qualified Data.Map.Strict as Map
import qualified Data.Vector.Mutable as Vector
data Msg = Msg !Int !ByteString.ByteString
type Chan = Map.Map Int ByteString.ByteString
data Chan2 = Chan2
{ next :: !Int
, maxId :: !Int
, ringBuffer :: !(Vector.IOVector ByteString.ByteString)
}
chanSize :: Int
chanSize = 200000
message :: Int -> Msg
message n = Msg n (ByteString.replicate 1024 (fromIntegral n))
newChan2 :: IO Chan2
newChan2 = Chan2 0 0 <$> Vector.unsafeNew chanSize
pushMsg2 :: Chan2 -> Msg -> IO Chan2
pushMsg2 (Chan2 ix _ store) (Msg msgId msgContent) =
let ix' = if ix == chanSize then 0 else ix + 1
in Vector.unsafeWrite store ix' msgContent >> return (Chan2 ix' msgId store)
pushMsg :: Chan -> Msg -> IO Chan
pushMsg chan (Msg msgId msgContent) =
Exception.evaluate $
let
inserted = Map.insert msgId msgContent chan
in
if chanSize < Map.size inserted
then Map.deleteMin inserted
else inserted
main, main1, main2 :: IO ()
main = main2
main1 = Monad.foldM_ pushMsg Map.empty (map message [1..1000000])
main2 = newChan2 >>= \c -> Monad.foldM_ pushMsg2 c (map message [1..1000000])
正如其他答案中提到的,GHC中的垃圾收集器遍历活数据,这意味着您在内存中存储的数据越长,GC停顿的时间就越长。
GHC 8.2
为了部分解决这个问题,在 GHC-8.2 中引入了一个名为 compact regions 的功能。它既是 GHC 运行时系统的一个特性,也是一个公开 方便 接口的库。紧凑区域功能允许将您的数据放在内存中的单独位置,并且 GC 不会在垃圾收集阶段遍历它。所以如果你有一个大的结构你想保存在内存中,考虑使用紧凑区域。然而,紧凑区域本身内部没有迷你垃圾收集器,它更适合append-only[=34] =] 数据结构,而不是像 HashMap
这样你也想删除东西的东西。虽然你可以克服这个问题。详情参考以下博客post:
GHC 8.10
此外,自 GHC-8.10 以来,实施了新的 low-latency incremental 垃圾收集器算法。这是一种替代的 GC 算法,默认情况下未启用,但您可以根据需要选择加入。因此,您可以将默认 GC 切换为较新的 GC,以自动获取 compact regions 提供的功能,而无需手动包装和展开。然而,新的 GC 并不是灵丹妙药,并不能自动解决所有问题,它也有其权衡取舍的地方。对于新 GC 的基准测试,请参考以下 GitHub 存储库:
我们正在开发一个接收和转发 "messages" 的程序,同时保留这些消息的临时历史记录,以便它可以在需要时告诉您消息历史记录。消息以数字方式标识,通常大小约为 1 KB,我们需要保留数十万条此类消息。
我们希望针对延迟优化此程序:发送和接收消息之间的时间必须低于 10 毫秒。
程序是用Haskell编写的,用GHC编译的。但是,我们发现垃圾收集暂停对于我们的延迟要求来说太长了:在我们的真实程序中超过 100 毫秒。
以下程序是我们应用程序的简化版本。它使用 Data.Map.Strict
来存储消息。消息是由 Int
标识的 ByteString
。以递增的数字顺序插入 1,000,000 条消息,并不断删除最旧的消息以将历史记录保持在最多 200,000 条消息。
module Main (main) where
import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import qualified Data.Map.Strict as Map
data Msg = Msg !Int !ByteString.ByteString
type Chan = Map.Map Int ByteString.ByteString
message :: Int -> Msg
message n = Msg n (ByteString.replicate 1024 (fromIntegral n))
pushMsg :: Chan -> Msg -> IO Chan
pushMsg chan (Msg msgId msgContent) =
Exception.evaluate $
let
inserted = Map.insert msgId msgContent chan
in
if 200000 < Map.size inserted
then Map.deleteMin inserted
else inserted
main :: IO ()
main = Monad.foldM_ pushMsg Map.empty (map message [1..1000000])
我们编译并运行这个程序使用:
$ ghc --version
The Glorious Glasgow Haskell Compilation System, version 7.10.3
$ ghc -O2 -optc-O3 Main.hs
$ ./Main +RTS -s
3,116,460,096 bytes allocated in the heap
385,101,600 bytes copied during GC
235,234,800 bytes maximum residency (14 sample(s))
124,137,808 bytes maximum slop
600 MB total memory in use (0 MB lost due to fragmentation)
Tot time (elapsed) Avg pause Max pause
Gen 0 6558 colls, 0 par 0.238s 0.280s 0.0000s 0.0012s
Gen 1 14 colls, 0 par 0.179s 0.250s 0.0179s 0.0515s
INIT time 0.000s ( 0.000s elapsed)
MUT time 0.652s ( 0.745s elapsed)
GC time 0.417s ( 0.530s elapsed)
EXIT time 0.010s ( 0.052s elapsed)
Total time 1.079s ( 1.326s elapsed)
%GC time 38.6% (40.0% elapsed)
Alloc rate 4,780,213,353 bytes per MUT second
Productivity 61.4% of total user, 49.9% of total elapsed
此处的重要指标是 "max pause" 0.0515 秒,即 51 毫秒。我们希望将其减少至少一个数量级。
实验表明,GC暂停的时间长度由历史消息的数量决定。这种关系大致是线性的,或者可能是超线性的。下面的 table 显示了这种关系。 (You can see our benchmarking tests here, and some charts here.)
msgs history length max GC pause (ms)
=================== =================
12500 3
25000 6
50000 13
100000 30
200000 56
400000 104
800000 199
1600000 487
3200000 1957
6400000 5378
我们已经对其他几个变量进行了实验,以确定它们是否可以减少这种延迟,none 其中有很大的不同。这些不重要的变量包括:优化(-O
、-O2
); RTS GC 选项(-G
、-H
、-A
、-c
)、核心数(-N
)、不同的数据结构(Data.Sequence
) 、消息的大小以及生成的短期垃圾的数量。压倒性的决定因素是历史消息的数量。
我们的工作理论是停顿与消息数量呈线性关系,因为每个 GC 周期都必须遍历所有工作的可访问内存并复制它,这显然是线性操作。
问题:
- 这个线性时间理论正确吗? GC 暂停的长度可以用这种简单的方式表示吗,还是现实更复杂?
- 如果GC停顿在工作内存中是线性的,有没有办法减少涉及的常数因子?
- 是否有增量 GC 或类似的选项?我们只能看到研究论文。我们非常愿意用吞吐量来换取更低的延迟。
- 除了拆分成多个进程之外,还有什么方法可以 "partition" 内存用于更小的 GC 周期?
你实际上做得很好,有 51 毫秒的暂停时间和超过 200Mb 的实时数据。我工作的系统有一个更大的最大暂停时间和一半的实时数据。
您的假设是正确的,主要的 GC 暂停时间与实时数据量成正比,不幸的是,目前的 GHC 无法解决这个问题。我们过去曾尝试过增量 GC,但这是一个研究项目,没有达到将其纳入已发布的 GHC 所需的成熟度。
我们希望将来能对此有所帮助的一件事是紧凑区域:https://phabricator.haskell.org/D1264。这是一种手动内存管理,您可以在堆中压缩一个结构,而 GC 不必遍历它。它最适合长期存在的数据,但也许它足以用于您设置中的单个消息。我们的目标是在 GHC 8.2.0 中使用它。
如果您处于分布式环境中并且拥有某种负载均衡器,则可以使用一些技巧来避免暂停命中,您基本上可以确保负载均衡器不会将请求发送到即将进行主要 GC 的机器,当然要确保机器仍然完成 GC,即使它没有收到请求。
嗯,您发现了具有 GC 的语言的局限性:它们不适合硬核实时系统。
您有 2 个选择:
1st 增加堆大小并使用 2 级缓存系统,将最旧的消息发送到磁盘并将最新的消息保留在内存中,您可以通过使用 OS 分页来实现。尽管使用此解决方案,但问题是分页可能会很昂贵,具体取决于所使用的辅助内存单元的读取能力。
第二个使用 'C' 解决方案的程序,并将其与 FFI 连接到 haskell。这样你就可以进行自己的内存管理。这将是最好的选择,因为您可以自己控制所需的内存。
我必须同意其他人的意见 - 如果您有严格的实时约束,那么使用 GC 语言并不理想。
但是,您可以考虑尝试其他可用的数据结构,而不仅仅是 Data.Map。
我使用 Data.Sequence 重写了它并获得了一些有希望的改进:
msgs history length max GC pause (ms)
=================== =================
12500 0.7
25000 1.4
50000 2.8
100000 5.4
200000 10.9
400000 21.8
800000 46
1600000 87
3200000 175
6400000 350
尽管您针对延迟进行了优化,但我注意到其他指标也有所改善。在 200000 的情况下,执行时间从 1.5s 下降到 0.2s,总内存使用量从 600MB 下降到 27MB。
请注意,我通过调整设计作弊:
- 我从
Msg
中删除了Int
,所以它不在两个地方。 - 我没有使用从
Int
s 到ByteString
s 的映射,而是使用了Sequence
的ByteString
s,而不是一个Int
每条消息,我认为可以用一个Int
来完成整个Sequence
。假设消息无法重新排序,您可以使用单个偏移量将您想要的消息转换到它在队列中的位置。
(我添加了一个附加函数 getMsg
来证明这一点。)
{-# LANGUAGE BangPatterns #-}
import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import Data.Sequence as S
newtype Msg = Msg ByteString.ByteString
data Chan = Chan Int (Seq ByteString.ByteString)
message :: Int -> Msg
message n = Msg (ByteString.replicate 1024 (fromIntegral n))
maxSize :: Int
maxSize = 200000
pushMsg :: Chan -> Msg -> IO Chan
pushMsg (Chan !offset sq) (Msg msgContent) =
Exception.evaluate $
let newSize = 1 + S.length sq
newSq = sq |> msgContent
in
if newSize <= maxSize
then Chan offset newSq
else
case S.viewl newSq of
(_ :< newSq') -> Chan (offset+1) newSq'
S.EmptyL -> error "Can't happen"
getMsg :: Chan -> Int -> Maybe Msg
getMsg (Chan offset sq) i_ = getMsg' (i_ - offset)
where
getMsg' i
| i < 0 = Nothing
| i >= S.length sq = Nothing
| otherwise = Just (Msg (S.index sq i))
main :: IO ()
main = Monad.foldM_ pushMsg (Chan 0 S.empty) (map message [1..5 * maxSize])
我已经使用 IOVector
作为基础数据结构,通过环形缓冲区方法尝试了您的代码片段。在我的系统(GHC 7.10.3,相同的编译选项)上,这导致最大时间(您在 OP 中提到的指标)减少了 ~22%。
注意。我在这里做了两个假设:
- 可变数据结构很适合解决这个问题(我想消息传递无论如何都意味着 IO)
- 您的 messageId 是连续的
通过一些额外的 Int
参数和算法(比如当 messageId 重置为 0 或 minBound
时),应该可以直接确定某个消息是否仍在历史记录中并检索它在环形缓冲区中形成相应的索引。
为了您的测试乐趣:
import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import qualified Data.Map.Strict as Map
import qualified Data.Vector.Mutable as Vector
data Msg = Msg !Int !ByteString.ByteString
type Chan = Map.Map Int ByteString.ByteString
data Chan2 = Chan2
{ next :: !Int
, maxId :: !Int
, ringBuffer :: !(Vector.IOVector ByteString.ByteString)
}
chanSize :: Int
chanSize = 200000
message :: Int -> Msg
message n = Msg n (ByteString.replicate 1024 (fromIntegral n))
newChan2 :: IO Chan2
newChan2 = Chan2 0 0 <$> Vector.unsafeNew chanSize
pushMsg2 :: Chan2 -> Msg -> IO Chan2
pushMsg2 (Chan2 ix _ store) (Msg msgId msgContent) =
let ix' = if ix == chanSize then 0 else ix + 1
in Vector.unsafeWrite store ix' msgContent >> return (Chan2 ix' msgId store)
pushMsg :: Chan -> Msg -> IO Chan
pushMsg chan (Msg msgId msgContent) =
Exception.evaluate $
let
inserted = Map.insert msgId msgContent chan
in
if chanSize < Map.size inserted
then Map.deleteMin inserted
else inserted
main, main1, main2 :: IO ()
main = main2
main1 = Monad.foldM_ pushMsg Map.empty (map message [1..1000000])
main2 = newChan2 >>= \c -> Monad.foldM_ pushMsg2 c (map message [1..1000000])
正如其他答案中提到的,GHC中的垃圾收集器遍历活数据,这意味着您在内存中存储的数据越长,GC停顿的时间就越长。
GHC 8.2
为了部分解决这个问题,在 GHC-8.2 中引入了一个名为 compact regions 的功能。它既是 GHC 运行时系统的一个特性,也是一个公开 方便 接口的库。紧凑区域功能允许将您的数据放在内存中的单独位置,并且 GC 不会在垃圾收集阶段遍历它。所以如果你有一个大的结构你想保存在内存中,考虑使用紧凑区域。然而,紧凑区域本身内部没有迷你垃圾收集器,它更适合append-only[=34] =] 数据结构,而不是像 HashMap
这样你也想删除东西的东西。虽然你可以克服这个问题。详情参考以下博客post:
GHC 8.10
此外,自 GHC-8.10 以来,实施了新的 low-latency incremental 垃圾收集器算法。这是一种替代的 GC 算法,默认情况下未启用,但您可以根据需要选择加入。因此,您可以将默认 GC 切换为较新的 GC,以自动获取 compact regions 提供的功能,而无需手动包装和展开。然而,新的 GC 并不是灵丹妙药,并不能自动解决所有问题,它也有其权衡取舍的地方。对于新 GC 的基准测试,请参考以下 GitHub 存储库: