runInBoundThread 是最好的并行工具吗?

Is runInBoundThread the best tool for parallelism?

说,我想fold monoids 并行。我的电脑有8个核心。我有这个功能可以将列表拆分为大小相等的较小列表(具有有界模偏差):

import Data.List

parallelize :: Int -> [a] -> [[a]]
parallelize 0 _ = []
parallelize n [] = replicate n []
parallelize n xs = let
    (us,vs) = splitAt (quot (length xs) n) xs
    in us : parallelize (n-1) vs

我做的并行fold的第一个版本是:

import Control.Concurrent
import Control.Concurrent.QSemN
import Data.Foldable
import Data.IORef

foldP :: Monoid m => [m] -> IO m
foldP xs = do
    result <- newIORef mempty
    sem <- newQSemN 0
    n <- getNumCapabilities
    let yss = parallelize n xs
    for_ yss (\ys -> forkIO (modifyIORef result (fold ys <>) >> signalQSemN sem 1))
    waitQSemN sem n
    readIORef result

但是 IORefs 和信号量的用法对我来说似乎很难看。所以我做了另一个版本:

import Data.Traversable

foldP :: Monoid m => [m] -> IO m
foldP xs = do
    n <- getNumCapabilities
    let yss = parallelize n xs
    rs <- for yss (\ys -> runInUnboundThread (return (fold ys)))
    return (fold rs)

我使用的测试代码是:

import Data.Monoid
import System.CPUTime

main :: IO ()
main = do
    start <- getCPUTime
    Product result <- foldP (fmap Product [1 .. 100])
    end <- getCPUTime
    putStrLn ("Time took: " ++ show (end - start) ++ "ps.")
    putStrLn ("Result: " ++ show result)

foldP 的第二个版本优于第一个版本。当我使用 runInBoundThread 而不是 runInUnboundThread 时,它变得更快。

这些性能差异是由什么造成的?

TLDR;使用 fold function from massiv 包,您可能会在 Haskell.

中获得最有效的解决方案

首先我想说的是,人们在尝试实现这样的并发模式时忘记的第一件事就是异常处理。在问题的解决方案中,异常处理是不存在的,因此这是完全错误的。因此,我建议对常见的并发模式使用现有的实现。 async 是用于并发的 goto 库,但对于这种用例,它不是最有效的解决方案。

这个特殊的例子可以很容易地用 scheduler 包来解决,事实上它正是为它设计的那种东西。以下是如何使用它实现幺半群的折叠:

import Control.Scheduler
import Control.Monad.IO.Unlift

foldP :: (MonadUnliftIO m, Monoid n) => Comp -> [n] -> m n
foldP comp xs = do
  rs <-
    withScheduler comp $ \scheduler ->
      mapM_ (scheduleWork scheduler . pure . fold) (parallelize (numWorkers scheduler) xs)
  pure $ fold rs

有关最佳并行化策略的说明,请参阅 Comp 类型。根据我在实践中发现 Par 通常效果最好,因为它将使用由 forkOn

创建的固定线程

注意parallelize函数实现起来效率低下也很危险,还是这样写比较好:


parallelize :: Int -> [a] -> [[a]]
parallelize n' xs' = go 0 id xs'
  where
    n = max 1 n'
    -- at least two elements make sense to get benefit of parallel fold
    k = max 2 $ quot (length xs') n
    go i acc xs
      | null xs = acc []
      | i < n =
        case splitAt k xs of
          (ls, rs) -> go (i + 1) (acc . (ls :)) rs
      | otherwise = acc . (xs:) $ []

还有一点建议是,列表对于一般的并行化和效率而言远非理想的数据结构。为了在并行计算之前将列表分成块,您已经必须使用 parallelize 遍历数据结构,如果您要使用数组,则可以避免这种情况。正如本答案开头所建议的那样,我得到的是改用数组。