减少 Haskell 程序中的垃圾收集暂停时间

Reducing garbage-collection pause time in a Haskell program

我们正在开发一个接收和转发 "messages" 的程序,同时保留这些消息的临时历史记录,以便它可以在需要时告诉您消息历史记录。消息以数字方式标识,通常大小约为 1 KB,我们需要保留数十万条此类消息。

我们希望针对延迟优化此程序:发送和接收消息之间的时间必须低于 10 毫秒。

程序是用Haskell编写的,用GHC编译的。但是,我们发现垃圾收集暂停对于我们的延迟要求来说太长了:在我们的真实程序中超过 100 毫秒。

以下程序是我们应用程序的简化版本。它使用 Data.Map.Strict 来存储消息。消息是由 Int 标识的 ByteString。以递增的数字顺序插入 1,000,000 条消息,并不断删除最旧的消息以将历史记录保持在最多 200,000 条消息。

module Main (main) where

import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import qualified Data.Map.Strict as Map

data Msg = Msg !Int !ByteString.ByteString

type Chan = Map.Map Int ByteString.ByteString

message :: Int -> Msg
message n = Msg n (ByteString.replicate 1024 (fromIntegral n))

pushMsg :: Chan -> Msg -> IO Chan
pushMsg chan (Msg msgId msgContent) =
  Exception.evaluate $
    let
      inserted = Map.insert msgId msgContent chan
    in
      if 200000 < Map.size inserted
      then Map.deleteMin inserted
      else inserted

main :: IO ()
main = Monad.foldM_ pushMsg Map.empty (map message [1..1000000])

我们编译并运行这个程序使用:

$ ghc --version
The Glorious Glasgow Haskell Compilation System, version 7.10.3
$ ghc -O2 -optc-O3 Main.hs
$ ./Main +RTS -s
   3,116,460,096 bytes allocated in the heap
     385,101,600 bytes copied during GC
     235,234,800 bytes maximum residency (14 sample(s))
     124,137,808 bytes maximum slop
             600 MB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0      6558 colls,     0 par    0.238s   0.280s     0.0000s    0.0012s
  Gen  1        14 colls,     0 par    0.179s   0.250s     0.0179s    0.0515s

  INIT    time    0.000s  (  0.000s elapsed)
  MUT     time    0.652s  (  0.745s elapsed)
  GC      time    0.417s  (  0.530s elapsed)
  EXIT    time    0.010s  (  0.052s elapsed)
  Total   time    1.079s  (  1.326s elapsed)

  %GC     time      38.6%  (40.0% elapsed)

  Alloc rate    4,780,213,353 bytes per MUT second

  Productivity  61.4% of total user, 49.9% of total elapsed

此处的重要指标是 "max pause" 0.0515 秒,即 51 毫秒。我们希望将其减少至少一个数量级。

实验表明,GC暂停的时间长度由历史消息的数量决定。这种关系大致是线性的,或者可能是超线性的。下面的 table 显示了这种关系。 (You can see our benchmarking tests here, and some charts here.)

msgs history length  max GC pause (ms)
===================  =================
12500                                3
25000                                6
50000                               13
100000                              30
200000                              56
400000                             104
800000                             199
1600000                            487
3200000                           1957
6400000                           5378

我们已经对其他几个变量进行了实验,以确定它们是否可以减少这种延迟,none 其中有很大的不同。这些不重要的变量包括:优化(-O-O2); RTS GC 选项(-G-H-A-c)、核心数(-N)、不同的数据结构(Data.Sequence) 、消息的大小以及生成的短期垃圾的数量。压倒性的决定因素是历史消息的数量。

我们的工作理论是停顿与消息数量呈线性关系,因为每个 GC 周期都必须遍历所有工作的可访问内存并复制它,这显然是线性操作。

问题:

你实际上做得很好,有 51 毫秒的暂停时间和超过 200Mb 的实时数据。我工作的系统有一个更大的最大暂停时间和一半的实时数据。

您的假设是正确的,主要的 GC 暂停时间与实时数据量成正比,不幸的是,目前的 GHC 无法解决这个问题。我们过去曾尝试过增量 GC,但这是一个研究项目,没有达到将其纳入已发布的 GHC 所需的成熟度。

我们希望将来能对此有所帮助的一件事是紧凑区域:https://phabricator.haskell.org/D1264。这是一种手动内存管理,您可以在堆中压缩一个结构,而 GC 不必遍历它。它最适合长期存在的数据,但也许它足以用于您设置中的单个消息。我们的目标是在 GHC 8.2.0 中使用它。

如果您处于分布式环境中并且拥有某种负载均衡器,则可以使用一些技巧来避免暂停命中,您基本上可以确保负载均衡器不会将请求发送到即将进行主要 GC 的机器,当然要确保机器仍然完成 GC,即使它没有收到请求。

嗯,您发现了具有 GC 的语言的局限性:它们不适合硬核实时系统。

您有 2 个选择:

1st 增加堆大小并使用 2 级缓存系统,将最旧的消息发送到磁盘并将最新的消息保留在内存中,您可以通过使用 OS 分页来实现。尽管使用此解决方案,但问题是分页可能会很昂贵,具体取决于所使用的辅助内存单元的读取能力。

第二个使用 'C' 解决方案的程序,并将其与 FFI 连接到 haskell。这样你就可以进行自己的内存管理。这将是最好的选择,因为您可以自己控制所需的内存。

我必须同意其他人的意见 - 如果您有严格的实时约束,那么使用 GC 语言并不理想。

但是,您可以考虑尝试其他可用的数据结构,而不仅仅是 Data.Map。

我使用 Data.Sequence 重写了它并获得了一些有希望的改进:

msgs history length  max GC pause (ms)
===================  =================
12500                              0.7
25000                              1.4
50000                              2.8
100000                             5.4
200000                            10.9
400000                            21.8
800000                            46
1600000                           87
3200000                          175
6400000                          350

尽管您针对延迟进行了优化,但我注意到其他指标也有所改善。在 200000 的情况下,执行时间从 1.5s 下降到 0.2s,总内存使用量从 600MB 下降到 27MB。

请注意,我通过调整设计作弊:

  • 我从 Msg 中删除了 Int,所以它不在两个地方。
  • 我没有使用从 Ints 到 ByteStrings 的映射,而是使用了 SequenceByteStrings,而不是一个 Int每条消息,我认为可以用一个 Int 来完成整个 Sequence。假设消息无法重新排序,您可以使用单个偏移量将您想要的消息转换到它在队列中的位置。

(我添加了一个附加函数 getMsg 来证明这一点。)

{-# LANGUAGE BangPatterns #-}

import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import Data.Sequence as S

newtype Msg = Msg ByteString.ByteString

data Chan = Chan Int (Seq ByteString.ByteString)

message :: Int -> Msg
message n = Msg (ByteString.replicate 1024 (fromIntegral n))

maxSize :: Int
maxSize = 200000

pushMsg :: Chan -> Msg -> IO Chan
pushMsg (Chan !offset sq) (Msg msgContent) =
    Exception.evaluate $
        let newSize = 1 + S.length sq
            newSq = sq |> msgContent
        in
        if newSize <= maxSize
            then Chan offset newSq
            else
                case S.viewl newSq of
                    (_ :< newSq') -> Chan (offset+1) newSq'
                    S.EmptyL -> error "Can't happen"

getMsg :: Chan -> Int -> Maybe Msg
getMsg (Chan offset sq) i_ = getMsg' (i_ - offset)
    where
    getMsg' i
        | i < 0            = Nothing
        | i >= S.length sq = Nothing
        | otherwise        = Just (Msg (S.index sq i))

main :: IO ()
main = Monad.foldM_ pushMsg (Chan 0 S.empty) (map message [1..5 * maxSize])

我已经使用 IOVector 作为基础数据结构,通过环形缓冲区方法尝试了您的代码片段。在我的系统(GHC 7.10.3,相同的编译选项)上,这导致最大时间(您在 OP 中提到的指标)减少了 ~22%。

注意。我在这里做了两个假设:

  1. 可变数据结构很适合解决这个问题(我想消息传递无论如何都意味着 IO)
  2. 您的 messageId 是连续的

通过一些额外的 Int 参数和算法(比如当 messageId 重置为 0 或 minBound 时),应该可以直接确定某个消息是否仍在历史记录中并检索它在环形缓冲区中形成相应的索引。

为了您的测试乐趣:

import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import qualified Data.Map.Strict as Map

import qualified Data.Vector.Mutable as Vector

data Msg = Msg !Int !ByteString.ByteString

type Chan = Map.Map Int ByteString.ByteString

data Chan2 = Chan2
    { next          :: !Int
    , maxId         :: !Int
    , ringBuffer    :: !(Vector.IOVector ByteString.ByteString)
    }

chanSize :: Int
chanSize = 200000

message :: Int -> Msg
message n = Msg n (ByteString.replicate 1024 (fromIntegral n))


newChan2 :: IO Chan2
newChan2 = Chan2 0 0 <$> Vector.unsafeNew chanSize

pushMsg2 :: Chan2 -> Msg -> IO Chan2
pushMsg2 (Chan2 ix _ store) (Msg msgId msgContent) =
    let ix' = if ix == chanSize then 0 else ix + 1
    in Vector.unsafeWrite store ix' msgContent >> return (Chan2 ix' msgId store)

pushMsg :: Chan -> Msg -> IO Chan
pushMsg chan (Msg msgId msgContent) =
  Exception.evaluate $
    let
      inserted = Map.insert msgId msgContent chan
    in
      if chanSize < Map.size inserted
      then Map.deleteMin inserted
      else inserted

main, main1, main2 :: IO ()

main = main2

main1 = Monad.foldM_ pushMsg Map.empty (map message [1..1000000])

main2 = newChan2 >>= \c -> Monad.foldM_ pushMsg2 c (map message [1..1000000])

正如其他答案中提到的,GHC中的垃圾收集器遍历活数据,这意味着您在内存中存储的数据越长,GC停顿的时间就越长。

GHC 8.2

为了部分解决这个问题,在 GHC-8.2 中引入了一个名为 compact regions 的功能。它既是 GHC 运行时系统的一个特性,也是一个公开 方便 接口的库。紧凑区域功能允许将您的数据放在内存中的单独位置,并且 GC 不会在垃圾收集阶段遍历它。所以如果你有一个大的结构你想保存在内存中,考虑使用紧凑区域。然而,紧凑区域本身内部没有迷你垃圾收集器,它更适合append-only[=34] =] 数据结构,而不是像 HashMap 这样你也想删除东西的东西。虽然你可以克服这个问题。详情参考以下博客post:

GHC 8.10

此外,自 GHC-8.10 以来,实施了新的 low-latency incremental 垃圾收集器算法。这是一种替代的 GC 算法,默认情况下未启用,但您可以根据需要选择加入。因此,您可以将默认 GC 切换为较新的 GC,以自动获取 compact regions 提供的功能,而无需手动包装和展开。然而,新的 GC 并不是灵丹妙药,并不能自动解决所有问题,它也有其权衡取舍的地方。对于新 GC 的基准测试,请参考以下 GitHub 存储库: