Haskell快速并发队列
Haskell fast concurrent queue
问题
您好!我正在编写一个日志记录库,我很想创建一个记录器,它将 运行 在单独的线程中,而所有应用程序线程只会向它发送消息。我想为这个问题找到最有效的解决方案。我在这里需要简单的 unboud 队列。
方法
我已经创建了一些测试来查看可用解决方案的性能,但我在这里得到了非常奇怪的结果。我测试了 4 个实现(下面提供了源代码)基于:
- pipes-concurrency
- Control.Concurrent.Chan
- Control.Concurrent.Chan.Unagi
- MVar based as described in the book "Parallel and Concurrent Programming in Haskell" 请注意,此技术为我们提供了容量为 1 的有界队列 - 它仅用于测试
测试
这里是用于测试的源代码:
{-# LANGUAGE NoMonomorphismRestriction #-}
import Control.Concurrent (threadDelay)
import Control.Monad (forever)
import Pipes
import qualified Pipes.Concurrent as Pipes
import Control.Applicative
import Control.Monad (replicateM_)
import System.Environment (getArgs)
import Control.Concurrent.Chan
import Control.Concurrent (forkIO)
import qualified Control.Concurrent.Chan.Unagi as U
import Control.Concurrent.MVar
import Criterion.Main
data Event = Msg String | Status | Quit deriving (Show)
----------------------------------------------------------------------
-- Pipes
----------------------------------------------------------------------
pipesLogMsg = yield (Msg "hello")
pipesManyLogs num = replicateM_ num pipesLogMsg
pipesAddProducer num o = Pipes.forkIO $ do runEffect $ (pipesManyLogs num) >-> Pipes.toOutput o
Pipes.performGC
pipesHandler max = loop 0
where
loop mnum = do
if mnum == max
then lift $ pure ()
else do event <- await
case event of
Msg _ -> loop (mnum + 1)
Status -> (lift $ putStrLn (show mnum)) *> loop mnum
Quit -> return ()
----------------------------------------------------------------------
-- Chan
----------------------------------------------------------------------
chanAddProducer num ch = forkIO $ chanManyLogs num ch
chanManyLogs num ch = replicateM_ num (writeChan ch (Msg "hello"))
chanHandler ch max = handlerIO (readChan ch) max
----------------------------------------------------------------------
-- Unagi-Chan
----------------------------------------------------------------------
uchanAddProducer num ch = forkIO $ uchanManyLogs num ch
uchanManyLogs num ch = replicateM_ num (U.writeChan ch (Msg "hello"))
uchanHandler ch max = handlerIO (U.readChan ch) max
----------------------------------------------------------------------
-- MVars
----------------------------------------------------------------------
mvarAddProducer num m = forkIO $ mvarManyLogs num m
mvarManyLogs num m = replicateM_ num (putMVar m (Msg "hello"))
mvarHandler m max = handlerIO (takeMVar m) max
----------------------------------------------------------------------
-- Utils
----------------------------------------------------------------------
handlerIO f max = loop 0 where
loop mnum = do
if mnum == max
then pure ()
else do event <- f
case event of
Msg _ -> loop (mnum + 1)
Status -> putStrLn (show mnum) *> loop mnum
Quit -> return ()
----------------------------------------------------------------------
-- Main
----------------------------------------------------------------------
main = defaultMain [
bench "pipes" $ nfIO $ do
(output, input) <- Pipes.spawn Pipes.Unbounded
replicateM_ prodNum (pipesAddProducer msgNum output)
runEffect $ Pipes.fromInput input >-> pipesHandler totalMsg
, bench "Chan" $ nfIO $ do
ch <- newChan
replicateM_ prodNum (chanAddProducer msgNum ch)
chanHandler ch totalMsg
, bench "Unagi-Chan" $ nfIO $ do
(inCh, outCh) <- U.newChan
replicateM_ prodNum (uchanAddProducer msgNum inCh)
uchanHandler outCh totalMsg
, bench "MVar" $ nfIO $ do
m <- newEmptyMVar
replicateM_ prodNum (mvarAddProducer msgNum m)
mvarHandler m totalMsg
]
where
prodNum = 20
msgNum = 1000
totalMsg = msgNum * prodNum
你可以用 ghc -O2 Main.hs
编译它,只用 运行 编译它。
测试创建 20 个消息生成器,每个生成 1000000 条消息。
结果
benchmarking pipes
time 46.68 ms (46.19 ms .. 47.31 ms)
0.999 R² (0.999 R² .. 1.000 R²)
mean 47.59 ms (47.20 ms .. 47.95 ms)
std dev 708.3 μs (558.4 μs .. 906.1 μs)
benchmarking Chan
time 4.252 ms (4.171 ms .. 4.351 ms)
0.995 R² (0.991 R² .. 0.998 R²)
mean 4.233 ms (4.154 ms .. 4.314 ms)
std dev 244.8 μs (186.3 μs .. 333.5 μs)
variance introduced by outliers: 35% (moderately inflated)
benchmarking Unagi-Chan
time 1.209 ms (1.198 ms .. 1.224 ms)
0.996 R² (0.993 R² .. 0.999 R²)
mean 1.267 ms (1.244 ms .. 1.308 ms)
std dev 102.4 μs (61.70 μs .. 169.3 μs)
variance introduced by outliers: 62% (severely inflated)
benchmarking MVar
time 1.746 ms (1.714 ms .. 1.774 ms)
0.997 R² (0.995 R² .. 0.998 R²)
mean 1.716 ms (1.694 ms .. 1.739 ms)
std dev 73.99 μs (65.32 μs .. 85.48 μs)
variance introduced by outliers: 29% (moderately inflated)
问题
我很想问你为什么管道并发版本执行得这么慢,为什么它甚至比基于 chan 的版本慢得多。我很惊讶,MVar 是所有版本中最快的 - 谁能告诉更多,为什么我们得到这个结果,我们是否可以在任何情况下做得更好?
如果我不得不猜测为什么 pipes-concurrency
性能更差,那是因为每个读写都包含在一个 STM
事务中,而其他库使用更高效的低级并发原语。
所以我可以给你一些关于 Chan
和 TQueue
(pipes-concurrency
在这里内部使用)的一些分析的概述,这些分析激发了一些设计决策unagi-chan
。我不确定它是否会回答你的问题。我建议在进行基准测试时分叉不同的队列并尝试变体,以真正了解正在发生的事情。
陈
Chan
看起来像:
data Chan a
= Chan (MVar (Stream a)) -- pointer to "head", where we read from
(MVar (Stream a)) -- pointer to "tail", where values written to
type Stream a = MVar (ChItem a)
data ChItem a = ChItem a (Stream a)
这是一个 MVar
的链表。 Chan
类型中的两个 MVar
分别充当指向列表当前头和尾的指针。这是写的样子:
writeChan :: Chan a -> a -> IO ()
writeChan (Chan _ writeVar) val = do
new_hole <- newEmptyMVar mask_ $ do
old_hole <- takeMVar writeVar -- [1]
putMVar old_hole (ChItem val new_hole) -- [2]
putMVar writeVar new_hole -- [3]
在 1 处,作者锁定了写入端,在 2 处,我们的项目 a
可供 reader 使用,在 3 处,写入端对其他作者解锁。
这实际上在 single-consumer/single-producer 场景中执行得很好(参见 the graph here),因为读取和写入不会争用。但是一旦你有多个并发编写器,你就会开始遇到麻烦:
当另一个写入器处于 2 时命中 1 的写入器将阻塞并取消调度(我能够测量上下文切换的最快速度是 ~150ns(非常快);有可能是速度慢得多的情况)。所以当你有很多作家在竞争时
你基本上是通过调度程序进行一次大往返,进入 MVar
的等待队列,然后最后写入可以完成。
当写入器在 2 时被取消调度(因为它超时),它会持有一个锁,并且在可以再次重新调度之前不允许完成任何写入;当我们 过度订阅 时,即当我们的 threads/core 比率很高时,这会成为一个更大的问题。
最后,使用 MVar
-per-item 在分配方面需要一些开销,更重要的是,当我们积累许多可变对象时,我们会造成很大的 GC 压力。
T队列
TQueue
很棒,因为 STM
使得推断其正确性变得非常简单。这是一个功能性的出列式队列,write
包括简单地读取写入器堆栈、构造我们的元素并将其写回:
data TQueue a = TQueue (TVar [a])
(TVar [a])
writeTQueue :: TQueue a -> a -> STM ()
writeTQueue (TQueue _ write) a = do
listend <- readTVar write -- a transaction with a consistent
writeTVar write (a:listend) -- view of memory
如果在 writeTQueue
写回其新堆栈后,另一个交错写入执行相同操作,将重试其中一个写入。随着更多 writeTQueue
交错,争用的影响会恶化。然而,性能下降比 Chan
慢得多,因为只有一个 writeTVar
操作可以使竞争 writeTQueue
s 无效,并且事务非常小(只是一个读取和一个 (:)
).
读取工作通过 "dequeuing" 堆栈从写入端进行,反转它,并将反转的堆栈存储在它自己的变量中以便于 "popping"(总而言之,这给了我们摊销的 O(1)推和弹出)
readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
xs <- readTVar read
case xs of
(x:xs') -> do writeTVar read xs'
return x
[] -> do ys <- readTVar write
case ys of
[] -> retry
_ -> case reverse ys of
[] -> error "readTQueue"
(z:zs) -> do writeTVar write []
writeTVar read zs
return z
读者对作者有一个对称的适度争用问题。在一般情况下,readers 和 writers 不会竞争,但是当 reader 堆栈耗尽时,readers 会与其他 readers 和 writers 竞争。我怀疑如果您预加载了一个具有足够值的 TQueue
,然后启动了 4 个 reader 和 4 个写入器,您可能会引发活锁,因为反向在下一次写入之前难以完成。值得注意的是,与 MVar
不同,对 TVar
的写入会同时唤醒许多 reader 正在等待的 TVar
(这可能或多或少有效,具体取决于情景)。
我怀疑您在测试中没有看到 TQueue
的很多弱点;主要是您看到了写入争用的适度影响以及大量分配和 GC 处理大量可变对象的开销。
鳗鱼酱
unagi-chan
最初是为了很好地处理争用而设计的。概念上很简单,但是实现起来有些复杂
data ChanEnd a = ChanEnd AtomicCounter (IORef (Int , Stream a))
data Stream a = Stream (Array (Cell a)) (IORef (Maybe (Stream a)))
data Cell a = Empty | Written a | Blocking (MVar a)
队列的读取和写入端共享 Stream
,它们在上面协调传递值(从写入器到 reader)和阻塞指示(从 reader 到写入器),读写端各有一个独立的原子计数器。写法如下:
a writer 在写计数器上调用原子 incrCounter
以接收其唯一索引,在该索引上与其(单个)reader
编写器找到它的单元并执行Written a
的CAS
如果成功它会退出,否则它会看到 reader 已经击败它并正在阻塞(或继续阻塞),所以它会执行 (\Blocking v)-> putMVar v a)
并退出。
阅读以类似且明显的方式工作。
第一个创新是使争用点成为一个原子操作,在争用下不会降级(就像 CAS/retry 循环或类似 Chan 的锁那样)。基于简单的基准测试和实验,fetch-and-add primop, exposed by the atomic-primops
library 效果最好。
然后在 2 中 reader 和 writer 只需要执行一次比较和交换(reader 的快速路径是一个简单的非原子读取)来完成协调。
所以为了让 unagi-chan
变得更好,我们
使用fetch-and-add来处理争用点
使用无锁技术,这样当我们超额订阅时,在不合时宜的时间取消调度的线程不会阻塞其他线程的进程(阻塞的写入器最多可能阻塞 reader "assigned" 到它;阅读 unagi-chan
文档中关于异步异常的注意事项,并注意 Chan
在这里有更好的语义)
使用数组来存储我们的元素,它具有更好的局部性(但见下文)每个元素的开销更低并且对 GC 施加的压力更小
关于最后的注释。使用数组:并发写入数组通常不是扩展的好主意,因为当缓存行在写入线程之间来回无效时,会导致大量缓存一致性流量。通用术语是"false sharing"。但是在缓存方面也有优点,而我能想到的替代设计也有缺点,比如条带化写入或其他东西;我一直在对此进行一些试验,但目前还没有任何定论。
我们合理关注虚假共享的一个地方是在我们的计数器中,我们将其对齐并填充为 64 字节;这确实出现在基准测试中,唯一的缺点是内存使用量增加。
问题
您好!我正在编写一个日志记录库,我很想创建一个记录器,它将 运行 在单独的线程中,而所有应用程序线程只会向它发送消息。我想为这个问题找到最有效的解决方案。我在这里需要简单的 unboud 队列。
方法
我已经创建了一些测试来查看可用解决方案的性能,但我在这里得到了非常奇怪的结果。我测试了 4 个实现(下面提供了源代码)基于:
- pipes-concurrency
- Control.Concurrent.Chan
- Control.Concurrent.Chan.Unagi
- MVar based as described in the book "Parallel and Concurrent Programming in Haskell" 请注意,此技术为我们提供了容量为 1 的有界队列 - 它仅用于测试
测试
这里是用于测试的源代码:
{-# LANGUAGE NoMonomorphismRestriction #-}
import Control.Concurrent (threadDelay)
import Control.Monad (forever)
import Pipes
import qualified Pipes.Concurrent as Pipes
import Control.Applicative
import Control.Monad (replicateM_)
import System.Environment (getArgs)
import Control.Concurrent.Chan
import Control.Concurrent (forkIO)
import qualified Control.Concurrent.Chan.Unagi as U
import Control.Concurrent.MVar
import Criterion.Main
data Event = Msg String | Status | Quit deriving (Show)
----------------------------------------------------------------------
-- Pipes
----------------------------------------------------------------------
pipesLogMsg = yield (Msg "hello")
pipesManyLogs num = replicateM_ num pipesLogMsg
pipesAddProducer num o = Pipes.forkIO $ do runEffect $ (pipesManyLogs num) >-> Pipes.toOutput o
Pipes.performGC
pipesHandler max = loop 0
where
loop mnum = do
if mnum == max
then lift $ pure ()
else do event <- await
case event of
Msg _ -> loop (mnum + 1)
Status -> (lift $ putStrLn (show mnum)) *> loop mnum
Quit -> return ()
----------------------------------------------------------------------
-- Chan
----------------------------------------------------------------------
chanAddProducer num ch = forkIO $ chanManyLogs num ch
chanManyLogs num ch = replicateM_ num (writeChan ch (Msg "hello"))
chanHandler ch max = handlerIO (readChan ch) max
----------------------------------------------------------------------
-- Unagi-Chan
----------------------------------------------------------------------
uchanAddProducer num ch = forkIO $ uchanManyLogs num ch
uchanManyLogs num ch = replicateM_ num (U.writeChan ch (Msg "hello"))
uchanHandler ch max = handlerIO (U.readChan ch) max
----------------------------------------------------------------------
-- MVars
----------------------------------------------------------------------
mvarAddProducer num m = forkIO $ mvarManyLogs num m
mvarManyLogs num m = replicateM_ num (putMVar m (Msg "hello"))
mvarHandler m max = handlerIO (takeMVar m) max
----------------------------------------------------------------------
-- Utils
----------------------------------------------------------------------
handlerIO f max = loop 0 where
loop mnum = do
if mnum == max
then pure ()
else do event <- f
case event of
Msg _ -> loop (mnum + 1)
Status -> putStrLn (show mnum) *> loop mnum
Quit -> return ()
----------------------------------------------------------------------
-- Main
----------------------------------------------------------------------
main = defaultMain [
bench "pipes" $ nfIO $ do
(output, input) <- Pipes.spawn Pipes.Unbounded
replicateM_ prodNum (pipesAddProducer msgNum output)
runEffect $ Pipes.fromInput input >-> pipesHandler totalMsg
, bench "Chan" $ nfIO $ do
ch <- newChan
replicateM_ prodNum (chanAddProducer msgNum ch)
chanHandler ch totalMsg
, bench "Unagi-Chan" $ nfIO $ do
(inCh, outCh) <- U.newChan
replicateM_ prodNum (uchanAddProducer msgNum inCh)
uchanHandler outCh totalMsg
, bench "MVar" $ nfIO $ do
m <- newEmptyMVar
replicateM_ prodNum (mvarAddProducer msgNum m)
mvarHandler m totalMsg
]
where
prodNum = 20
msgNum = 1000
totalMsg = msgNum * prodNum
你可以用 ghc -O2 Main.hs
编译它,只用 运行 编译它。
测试创建 20 个消息生成器,每个生成 1000000 条消息。
结果
benchmarking pipes
time 46.68 ms (46.19 ms .. 47.31 ms)
0.999 R² (0.999 R² .. 1.000 R²)
mean 47.59 ms (47.20 ms .. 47.95 ms)
std dev 708.3 μs (558.4 μs .. 906.1 μs)
benchmarking Chan
time 4.252 ms (4.171 ms .. 4.351 ms)
0.995 R² (0.991 R² .. 0.998 R²)
mean 4.233 ms (4.154 ms .. 4.314 ms)
std dev 244.8 μs (186.3 μs .. 333.5 μs)
variance introduced by outliers: 35% (moderately inflated)
benchmarking Unagi-Chan
time 1.209 ms (1.198 ms .. 1.224 ms)
0.996 R² (0.993 R² .. 0.999 R²)
mean 1.267 ms (1.244 ms .. 1.308 ms)
std dev 102.4 μs (61.70 μs .. 169.3 μs)
variance introduced by outliers: 62% (severely inflated)
benchmarking MVar
time 1.746 ms (1.714 ms .. 1.774 ms)
0.997 R² (0.995 R² .. 0.998 R²)
mean 1.716 ms (1.694 ms .. 1.739 ms)
std dev 73.99 μs (65.32 μs .. 85.48 μs)
variance introduced by outliers: 29% (moderately inflated)
问题
我很想问你为什么管道并发版本执行得这么慢,为什么它甚至比基于 chan 的版本慢得多。我很惊讶,MVar 是所有版本中最快的 - 谁能告诉更多,为什么我们得到这个结果,我们是否可以在任何情况下做得更好?
如果我不得不猜测为什么 pipes-concurrency
性能更差,那是因为每个读写都包含在一个 STM
事务中,而其他库使用更高效的低级并发原语。
所以我可以给你一些关于 Chan
和 TQueue
(pipes-concurrency
在这里内部使用)的一些分析的概述,这些分析激发了一些设计决策unagi-chan
。我不确定它是否会回答你的问题。我建议在进行基准测试时分叉不同的队列并尝试变体,以真正了解正在发生的事情。
陈
Chan
看起来像:
data Chan a
= Chan (MVar (Stream a)) -- pointer to "head", where we read from
(MVar (Stream a)) -- pointer to "tail", where values written to
type Stream a = MVar (ChItem a)
data ChItem a = ChItem a (Stream a)
这是一个 MVar
的链表。 Chan
类型中的两个 MVar
分别充当指向列表当前头和尾的指针。这是写的样子:
writeChan :: Chan a -> a -> IO ()
writeChan (Chan _ writeVar) val = do
new_hole <- newEmptyMVar mask_ $ do
old_hole <- takeMVar writeVar -- [1]
putMVar old_hole (ChItem val new_hole) -- [2]
putMVar writeVar new_hole -- [3]
在 1 处,作者锁定了写入端,在 2 处,我们的项目 a
可供 reader 使用,在 3 处,写入端对其他作者解锁。
这实际上在 single-consumer/single-producer 场景中执行得很好(参见 the graph here),因为读取和写入不会争用。但是一旦你有多个并发编写器,你就会开始遇到麻烦:
当另一个写入器处于 2 时命中 1 的写入器将阻塞并取消调度(我能够测量上下文切换的最快速度是 ~150ns(非常快);有可能是速度慢得多的情况)。所以当你有很多作家在竞争时 你基本上是通过调度程序进行一次大往返,进入
MVar
的等待队列,然后最后写入可以完成。当写入器在 2 时被取消调度(因为它超时),它会持有一个锁,并且在可以再次重新调度之前不允许完成任何写入;当我们 过度订阅 时,即当我们的 threads/core 比率很高时,这会成为一个更大的问题。
最后,使用 MVar
-per-item 在分配方面需要一些开销,更重要的是,当我们积累许多可变对象时,我们会造成很大的 GC 压力。
T队列
TQueue
很棒,因为 STM
使得推断其正确性变得非常简单。这是一个功能性的出列式队列,write
包括简单地读取写入器堆栈、构造我们的元素并将其写回:
data TQueue a = TQueue (TVar [a])
(TVar [a])
writeTQueue :: TQueue a -> a -> STM ()
writeTQueue (TQueue _ write) a = do
listend <- readTVar write -- a transaction with a consistent
writeTVar write (a:listend) -- view of memory
如果在 writeTQueue
写回其新堆栈后,另一个交错写入执行相同操作,将重试其中一个写入。随着更多 writeTQueue
交错,争用的影响会恶化。然而,性能下降比 Chan
慢得多,因为只有一个 writeTVar
操作可以使竞争 writeTQueue
s 无效,并且事务非常小(只是一个读取和一个 (:)
).
读取工作通过 "dequeuing" 堆栈从写入端进行,反转它,并将反转的堆栈存储在它自己的变量中以便于 "popping"(总而言之,这给了我们摊销的 O(1)推和弹出)
readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
xs <- readTVar read
case xs of
(x:xs') -> do writeTVar read xs'
return x
[] -> do ys <- readTVar write
case ys of
[] -> retry
_ -> case reverse ys of
[] -> error "readTQueue"
(z:zs) -> do writeTVar write []
writeTVar read zs
return z
读者对作者有一个对称的适度争用问题。在一般情况下,readers 和 writers 不会竞争,但是当 reader 堆栈耗尽时,readers 会与其他 readers 和 writers 竞争。我怀疑如果您预加载了一个具有足够值的 TQueue
,然后启动了 4 个 reader 和 4 个写入器,您可能会引发活锁,因为反向在下一次写入之前难以完成。值得注意的是,与 MVar
不同,对 TVar
的写入会同时唤醒许多 reader 正在等待的 TVar
(这可能或多或少有效,具体取决于情景)。
我怀疑您在测试中没有看到 TQueue
的很多弱点;主要是您看到了写入争用的适度影响以及大量分配和 GC 处理大量可变对象的开销。
鳗鱼酱
unagi-chan
最初是为了很好地处理争用而设计的。概念上很简单,但是实现起来有些复杂
data ChanEnd a = ChanEnd AtomicCounter (IORef (Int , Stream a))
data Stream a = Stream (Array (Cell a)) (IORef (Maybe (Stream a)))
data Cell a = Empty | Written a | Blocking (MVar a)
队列的读取和写入端共享 Stream
,它们在上面协调传递值(从写入器到 reader)和阻塞指示(从 reader 到写入器),读写端各有一个独立的原子计数器。写法如下:
a writer 在写计数器上调用原子
incrCounter
以接收其唯一索引,在该索引上与其(单个)reader编写器找到它的单元并执行
Written a
的CAS
如果成功它会退出,否则它会看到 reader 已经击败它并正在阻塞(或继续阻塞),所以它会执行
(\Blocking v)-> putMVar v a)
并退出。
阅读以类似且明显的方式工作。
第一个创新是使争用点成为一个原子操作,在争用下不会降级(就像 CAS/retry 循环或类似 Chan 的锁那样)。基于简单的基准测试和实验,fetch-and-add primop, exposed by the atomic-primops
library 效果最好。
然后在 2 中 reader 和 writer 只需要执行一次比较和交换(reader 的快速路径是一个简单的非原子读取)来完成协调。
所以为了让 unagi-chan
变得更好,我们
使用fetch-and-add来处理争用点
使用无锁技术,这样当我们超额订阅时,在不合时宜的时间取消调度的线程不会阻塞其他线程的进程(阻塞的写入器最多可能阻塞 reader "assigned" 到它;阅读
unagi-chan
文档中关于异步异常的注意事项,并注意Chan
在这里有更好的语义)使用数组来存储我们的元素,它具有更好的局部性(但见下文)每个元素的开销更低并且对 GC 施加的压力更小
关于最后的注释。使用数组:并发写入数组通常不是扩展的好主意,因为当缓存行在写入线程之间来回无效时,会导致大量缓存一致性流量。通用术语是"false sharing"。但是在缓存方面也有优点,而我能想到的替代设计也有缺点,比如条带化写入或其他东西;我一直在对此进行一些试验,但目前还没有任何定论。
我们合理关注虚假共享的一个地方是在我们的计数器中,我们将其对齐并填充为 64 字节;这确实出现在基准测试中,唯一的缺点是内存使用量增加。