Haskell快速并发队列

Haskell fast concurrent queue

问题

您好!我正在编写一个日志记录库,我很想创建一个记录器,它将 运行 在单独的线程中,而所有应用程序线程只会向它发送消息。我想为这个问题找到最有效的解决方案。我在这里需要简单的 unboud 队列。

方法

我已经创建了一些测试来查看可用解决方案的性能,但我在这里得到了非常奇怪的结果。我测试了 4 个实现(下面提供了源代码)基于:

  1. pipes-concurrency
  2. Control.Concurrent.Chan
  3. Control.Concurrent.Chan.Unagi
  4. MVar based as described in the book "Parallel and Concurrent Programming in Haskell" 请注意,此技术为我们提供了容量为 1 的有界队列 - 它仅用于测试

测试

这里是用于测试的源代码:

{-# LANGUAGE NoMonomorphismRestriction #-}

import Control.Concurrent (threadDelay)
import Control.Monad (forever)
import Pipes
import qualified Pipes.Concurrent as Pipes
import Control.Applicative
import Control.Monad (replicateM_)
import System.Environment (getArgs)

import Control.Concurrent.Chan
import Control.Concurrent (forkIO)
import qualified Control.Concurrent.Chan.Unagi as U
import Control.Concurrent.MVar
import Criterion.Main

data Event = Msg String | Status | Quit deriving (Show)

----------------------------------------------------------------------
-- Pipes
----------------------------------------------------------------------

pipesLogMsg = yield (Msg "hello")
pipesManyLogs num = replicateM_ num pipesLogMsg

pipesAddProducer num o = Pipes.forkIO $ do runEffect $ (pipesManyLogs num) >-> Pipes.toOutput o
                                           Pipes.performGC

pipesHandler max = loop 0
  where
    loop mnum = do
        if mnum == max
            then lift $ pure ()
            else do event <- await
                    case event of
                        Msg _  -> loop (mnum + 1)
                        Status -> (lift $ putStrLn (show mnum)) *> loop mnum
                        Quit   -> return ()

----------------------------------------------------------------------
-- Chan
----------------------------------------------------------------------

chanAddProducer num ch = forkIO $ chanManyLogs num ch
chanManyLogs num ch = replicateM_ num (writeChan ch (Msg "hello"))
chanHandler ch max = handlerIO (readChan ch) max

----------------------------------------------------------------------
-- Unagi-Chan
----------------------------------------------------------------------

uchanAddProducer num ch = forkIO $ uchanManyLogs num ch
uchanManyLogs num ch = replicateM_ num (U.writeChan ch (Msg "hello"))
uchanHandler ch max = handlerIO (U.readChan ch) max

----------------------------------------------------------------------
-- MVars
----------------------------------------------------------------------

mvarAddProducer num m = forkIO $ mvarManyLogs num m
mvarManyLogs num m = replicateM_ num (putMVar m (Msg "hello"))
mvarHandler m max = handlerIO (takeMVar m) max

----------------------------------------------------------------------
-- Utils
----------------------------------------------------------------------

handlerIO f max = loop 0 where
    loop mnum = do
        if mnum == max 
            then pure ()
            else do event <- f
                    case event of
                         Msg _  -> loop (mnum + 1)
                         Status -> putStrLn (show mnum) *> loop mnum
                         Quit   -> return ()

----------------------------------------------------------------------
-- Main
----------------------------------------------------------------------

main = defaultMain [
      bench "pipes" $ nfIO $ do
        (output, input) <- Pipes.spawn Pipes.Unbounded
        replicateM_ prodNum (pipesAddProducer msgNum output)
        runEffect $ Pipes.fromInput input >-> pipesHandler totalMsg
    , bench "Chan" $ nfIO $ do
        ch <- newChan
        replicateM_ prodNum (chanAddProducer msgNum ch)
        chanHandler ch totalMsg
    , bench "Unagi-Chan" $ nfIO $ do
        (inCh, outCh) <- U.newChan
        replicateM_ prodNum (uchanAddProducer msgNum inCh)
        uchanHandler outCh totalMsg
    , bench "MVar" $ nfIO $ do
        m <- newEmptyMVar
        replicateM_ prodNum (mvarAddProducer msgNum m)
        mvarHandler m totalMsg
    ]
  where
    prodNum  = 20
    msgNum   = 1000
    totalMsg = msgNum * prodNum

你可以用 ghc -O2 Main.hs 编译它,只用 运行 编译它。 测试创建 20 个消息生成器,每个生成 1000000 条消息。

结果

benchmarking pipes
time                 46.68 ms   (46.19 ms .. 47.31 ms)
                     0.999 R²   (0.999 R² .. 1.000 R²)
mean                 47.59 ms   (47.20 ms .. 47.95 ms)
std dev              708.3 μs   (558.4 μs .. 906.1 μs)

benchmarking Chan
time                 4.252 ms   (4.171 ms .. 4.351 ms)
                     0.995 R²   (0.991 R² .. 0.998 R²)
mean                 4.233 ms   (4.154 ms .. 4.314 ms)
std dev              244.8 μs   (186.3 μs .. 333.5 μs)
variance introduced by outliers: 35% (moderately inflated)

benchmarking Unagi-Chan
time                 1.209 ms   (1.198 ms .. 1.224 ms)
                     0.996 R²   (0.993 R² .. 0.999 R²)
mean                 1.267 ms   (1.244 ms .. 1.308 ms)
std dev              102.4 μs   (61.70 μs .. 169.3 μs)
variance introduced by outliers: 62% (severely inflated)

benchmarking MVar
time                 1.746 ms   (1.714 ms .. 1.774 ms)
                     0.997 R²   (0.995 R² .. 0.998 R²)
mean                 1.716 ms   (1.694 ms .. 1.739 ms)
std dev              73.99 μs   (65.32 μs .. 85.48 μs)
variance introduced by outliers: 29% (moderately inflated)

问题

我很想问你为什么管道并发版本执行得这么慢,为什么它甚至比基于 chan 的版本慢得多。我很惊讶,MVar 是所有版本中最快的 - 谁能告诉更多,为什么我们得到这个结果,我们是否可以在任何情况下做得更好?

如果我不得不猜测为什么 pipes-concurrency 性能更差,那是因为每个读写都包含在一个 STM 事务中,而其他库使用更高效的低级并发原语。

所以我可以给你一些关于 ChanTQueuepipes-concurrency 在这里内部使用)的一些分析的概述,这些分析激发了一些设计决策unagi-chan。我不确定它是否会回答你的问题。我建议在进行基准测试时分叉不同的队列并尝试变体,以真正了解正在发生的事情。

Chan 看起来像:

data Chan a
 = Chan (MVar (Stream a)) -- pointer to "head", where we read from
        (MVar (Stream a)) -- pointer to "tail", where values written to

type Stream a = MVar (ChItem a)
data ChItem a = ChItem a (Stream a)

这是一个 MVar 的链表。 Chan 类型中的两个 MVar 分别充当指向列表当前头和尾的指针。这是写的样子:

writeChan :: Chan a -> a -> IO () 
writeChan (Chan _ writeVar) val = do 
    new_hole <- newEmptyMVar   mask_ $ do
    old_hole <- takeMVar writeVar           -- [1]
    putMVar old_hole (ChItem val new_hole)  -- [2]
    putMVar writeVar new_hole               -- [3]

在 1 处,作者锁定了写入端,在 2 处,我们的项目 a 可供 reader 使用,在 3 处,写入端对其他作者解锁。

这实际上在 single-consumer/single-producer 场景中执行得很好(参见 the graph here),因为读取和写入不会争用。但是一旦你有多个并发编写器,你就会开始遇到麻烦:

  • 当另一个写入器处于 2 时命中 1 的写入器将阻塞并取消调度(我能够测量上下文切换的最快速度是 ~150ns(非常快);有可能是速度慢得多的情况)。所以当你有很多作家在竞争时 你基本上是通过调度程序进行一次大往返,进入 MVar 的等待队列,然后最后写入可以完成。

  • 当写入器在 2 时被取消调度(因为它超时),它会持有一个锁,并且在可以再次重新调度之前不允许完成任何写入;当我们 过度订阅 时,即当我们的 threads/core 比率很高时,这会成为一个更大的问题。

最后,使用 MVar-per-item 在分配方面需要一些开销,更重要的是,当我们积累许多可变对象时,我们会造成很大的 GC 压力。

T队列

TQueue 很棒,因为 STM 使得推断其正确性变得非常简单。这是一个功能性的出列式队列,write 包括简单地读取写入器堆栈、构造我们的元素并将其写回:

data TQueue a = TQueue (TVar [a])
                       (TVar [a])

writeTQueue :: TQueue a -> a -> STM ()
writeTQueue (TQueue _ write) a = do  
  listend <- readTVar write   -- a transaction with a consistent 
  writeTVar write (a:listend) -- view of memory

如果在 writeTQueue 写回其新堆栈后,另一个交错写入执行相同操作,将重试其中一个写入。随着更多 writeTQueue 交错,争用的影响会恶化。然而,性能下降比 Chan 慢得多,因为只有一个 writeTVar 操作可以使竞争 writeTQueues 无效,并且事务非常小(只是一个读取和一个 (:)).

读取工作通过 "dequeuing" 堆栈从写入端进行,反转它,并将反转的堆栈存储在它自己的变量中以便于 "popping"(总而言之,这给了我们摊销的 O(1)推和弹出)

readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
  xs <- readTVar read
  case xs of
    (x:xs') -> do writeTVar read xs'
                  return x
    [] -> do ys <- readTVar write
             case ys of
               [] -> retry
               _  -> case reverse ys of
                       [] -> error "readTQueue"
                       (z:zs) -> do writeTVar write []
                                    writeTVar read zs
                                    return z

读者对作者有一个对称的适度争用问题。在一般情况下,readers 和 writers 不会竞争,但是当 reader 堆栈耗尽时,readers 会与其他 readers 和 writers 竞争。我怀疑如果您预加载了一个具有足够值的 TQueue,然后启动了 4 个 reader 和 4 个写入器,您可能会引发活锁,因为反向在下一次写入之前难以完成。值得注意的是,与 MVar 不同,对 TVar 的写入会同时唤醒许多 reader 正在等待的 TVar(这可能或多或少有效,具体取决于情景)。

我怀疑您在测试中没有看到 TQueue 的很多弱点;主要是您看到了写入争用的适度影响以及大量分配和 GC 处理大量可变对象的开销。

鳗鱼酱

unagi-chan 最初是为了很好地处理争用而设计的。概念上很简单,但是实现起来有些复杂

data ChanEnd a = ChanEnd AtomicCounter (IORef (Int , Stream a))

data Stream a = Stream (Array (Cell a)) (IORef (Maybe (Stream a)))

data Cell a = Empty | Written a | Blocking (MVar a)

队列的读取和写入端共享 Stream,它们在上面协调传递值(从写入器到 reader)和阻塞指示(从 reader 到写入器),读写端各有一个独立的原子计数器。写法如下:

  1. a writer 在写计数器上调用原子 incrCounter 以接收其唯一索引,在该索引上与其(单个)reader

  2. 编写器找到它的单元并执行Written a

  3. 的CAS
  4. 如果成功它会退出,否则它会看到 reader 已经击败它并正在阻塞(或继续阻塞),所以它会执行 (\Blocking v)-> putMVar v a) 并退出。

阅读以类似且明显的方式工作。

第一个创新是使争用点成为一个原子操作,在争用下不会降级(就像 CAS/retry 循环或类似 Chan 的锁那样)。基于简单的基准测试和实验,fetch-and-add primop, exposed by the atomic-primops library 效果最好。

然后在 2 中 reader 和 writer 只需要执行一次比较和交换(reader 的快速路径是一个简单的非原子读取)来完成协调。

所以为了让 unagi-chan 变得更好,我们

  • 使用fetch-and-add来处理争用点

  • 使用无锁技术,这样当我们超额订阅时,在不合时宜的时间取消调度的线程不会阻塞其他线程的进程(阻塞的写入器最多可能阻塞 reader "assigned" 到它;阅读 unagi-chan 文档中关于异步异常的注意事项,并注意 Chan 在这里有更好的语义)

  • 使用数组来存储我们的元素,它具有更好的局部性(但见下文)每个元素的开销更低并且对 GC 施加的压力更小

关于最后的注释。使用数组:并发写入数组通常不是扩展的好主意,因为当缓存行在写入线程之间来回无效时,会导致大量缓存一致性流量。通用术语是"false sharing"。但是在缓存方面也有优点,而我能想到的替代设计也有缺点,比如条带化写入或其他东西;我一直在对此进行一些试验,但目前还没有任何定论。

我们合理关注虚假共享的一个地方是在我们的计数器中,我们将其对齐并填充为 64 字节;这确实出现在基准测试中,唯一的缺点是内存使用量增加。