IOArray 和 STArray 的奇怪分析开销

Strange profiling overhead for IOArray and STArray

我正在测试各种记忆方法的速度。下面的代码比较了两种使用数组进行记忆的实现。我在递归函数上对此进行了测试。完整代码如下

运行 stack test 对应 memoweird 1000memoweird 5000 等的程序显示 IOArray 始终比 STArray 快几秒钟,差异似乎是 O(1)。但是,运行 与 stack test --profile 相同的程序 反转了 结果,并且 STArray 变得持续快了大约一秒。

{-# LANGUAGE ScopedTypeVariables #-}
module Main where

import Data.Array
import Data.Array.ST
import Control.Monad.ST
import Data.Array.IO
import GHC.IO
import Control.Monad
import Data.Time

memoST :: forall a b. (Ix a)
     => (a, a)    -- range of the argument memoized
     -> ((a -> b) -- a recursive function, but uses it's first argument for recursive calls instead
       -> a -> b)
     -> (a -> b)  -- memoized function
memoST r f = (runSTArray compute !)
    where
        compute :: ST s (STArray s a b)
        compute= do
            arr <- newArray_ r
            forM_ (range r) (\i -> do
                writeArray arr i $ f (memoST r f) i)
            return arr

memoArray :: forall a b. (Ix a)
     => (a, a)
     -> ((a -> b) -> a -> b)
     -> a -> b
memoArray r f = (unsafePerformIO compute !)  -- safe!
    where
        compute :: IO (Array a b)
        compute = do
            arr <- newArray_ r :: IO (IOArray a b)
            forM_ (range r) (\i -> do
                writeArray arr i$ f (memoArray r f) i)
            freeze arr

weird :: (Int -> Int) -> Int -> Int
weird _ 0 = 0
weird _ 1 = 0
weird f i = f (i `div` 2) + f (i - 1) + 1

stweird :: Int -> Int
stweird n = memoST (0,n) weird n
arrayweird :: Int -> Int
arrayweird n = memoArray (0,n) weird n

main :: IO()
main = do
    t0 <- getCurrentTime
    print (stweird 5000)
    t1 <- getCurrentTime
    print (arrayweird 5000)
    t2 <- getCurrentTime
    let sttime = diffUTCTime t0 t1
    let artime = diffUTCTime t1 t2
    print (sttime - artime)

两种数组类型的分析开销如此不同(尽管很小)是否有原因?

我在 OS X 上使用 Stack 版本 2.7.3,GHC 版本 8.10.4。


我电脑上的一些数据。

运行 这几次:

Without Profiling:
 -0.222663s -0.116007s -0.202765s -0.205319s -0.130202s
  Avg -0.1754s
  Std  0.0486s
With Profiling:
 0.608895s -0.755541s -0.61222s -0.83613s 0.450045s
 1.879662s -0.181789s 3.251379s 0.359211s 0.122721s
  Avg  0.4286s
  Std  1.2764s

显然,分析器的随机波动掩盖了差异。此处的数据不足以确认差异。

您确实应该使用 criterion 进行基准测试。

benchmarking stweird
time                 3.116 s    (3.109 s .. 3.119 s)
                     1.000 R²   (1.000 R² .. 1.000 R²)
mean                 3.112 s    (3.110 s .. 3.113 s)
std dev              2.220 ms   (953.8 μs .. 2.807 ms)
variance introduced by outliers: 19% (moderately inflated)

benchmarking marrayweird
time                 3.170 s    (2.684 s .. 3.602 s)
                     0.997 R²   (0.989 R² .. 1.000 R²)
mean                 3.204 s    (3.148 s .. 3.280 s)
std dev              72.66 ms   (1.810 ms .. 88.94 ms)
variance introduced by outliers: 19% (moderately inflated)

我的系统有噪音,但看起来标准差确实没有重叠。不过,我实际上并不太关心找出原因,因为代码非常慢。记忆 5000 个操作需要 3 秒?出现了严重错误。

所写代码是超指数算法 - 记忆代码中没有共享记忆函数。每个子评估都可以创建一个全新的数组并填充它。有两件事使你摆脱了这种情况。首先是懒惰——大多数价值从未被评估过。这里的结果是算法实际上将终止,而不是永远急切地评估数组条目。其次,更重要的是,GHC 进行了一些常量提升,将表达式 (memoST r f)(或 arrayST 版本)提升到循环体之外。这会在每个循环体内创建共享,以便两个子调用实际上共享记忆。这不是很好,但它实际上在做一些加速。但这主要是偶然的。

这种记忆化的传统方法是让懒惰做必要的改变:

memoArray
    :: forall a b. (Ix a)
    => (a, a)
    -> ((a -> b) -> a -> b)
    -> a -> b
memoArray r f = fetch
  where
    fetch n = arr ! n
    arr = listArray r $ map (f fetch) (range r)

注意内部 fetcharr 之间的打结。这确保在每次计算中使用相同的数组。它的基准测试更好一些:

benchmarking arrayweird
time                 212.0 μs   (211.5 μs .. 212.6 μs)
                     1.000 R²   (0.999 R² .. 1.000 R²)
mean                 213.3 μs   (212.4 μs .. 215.0 μs)
std dev              4.104 μs   (2.469 μs .. 6.194 μs)
variance introduced by outliers: 12% (moderately inflated)

213 微秒比我期望的仅 5000 次迭代要多得多。不过,人们可能会好奇进行显式共享是否可以与其他变体一起使用。它可以:

memoST'
    :: forall a b. (Ix a)
    => (a, a)
    -> ((a -> b) -> a -> b)
    -> a -> b
memoST' r f = fetch
  where
    fetch n =  arr ! n

    arr = runSTArray compute

    compute :: ST s (STArray s a b)
    compute = do
        a <- newArray_ r
        forM_ (range r) $ \i -> do
            writeArray a i $ f fetch i
        return a

memoMArray'
    :: forall a b. (Ix a)
    => (a, a)
    -> ((a -> b) -> a -> b)
    -> a -> b
memoMArray' r f = fetch
  where
    fetch n = arr ! n

    arr = unsafePerformIO compute

    compute :: IO (Array a b)
    compute = do
        a <- newArray_ r :: IO (IOArray a b)
        forM_ (range r) $ \i -> do
            writeArray a i $ f fetch i
        freeze a

那些使用显式共享来引入相同类型的打结,虽然明显更间接。

benchmarking stweird'
time                 168.1 μs   (167.1 μs .. 169.9 μs)
                     1.000 R²   (0.999 R² .. 1.000 R²)
mean                 167.1 μs   (166.7 μs .. 167.8 μs)
std dev              1.636 μs   (832.3 ns .. 3.007 μs)

benchmarking marrayweird'
time                 171.1 μs   (170.7 μs .. 171.7 μs)
                     1.000 R²   (1.000 R² .. 1.000 R²)
mean                 170.9 μs   (170.5 μs .. 171.4 μs)
std dev              1.554 μs   (1.076 μs .. 2.224 μs)

而且那些实际上似乎击败了 listArray 变体。我真的不知道那是怎么回事。 listArray 一定是在做一些令人吃惊的额外工作。好吧。

最后,我实际上并不知道是什么导致了这些小的性能差异。但与实际使用高效算法相比,其中 none 意义重大。

完整代码,供您阅读:

{-# LANGUAGE ScopedTypeVariables #-}
module Main where

import Data.Array
import Data.Array.ST
import Data.Array.Unsafe
import Control.Monad.ST
import Data.Array.IO
import GHC.IO.Unsafe
import Control.Monad
import Criterion.Main


memoST
    :: forall a b. (Ix a)
    => (a, a)
    -> ((a -> b) -> a -> b)
    -> a -> b
memoST r f = (runSTArray compute !)
  where
    compute :: ST s (STArray s a b)
    compute = do
        arr <- newArray_ r
        forM_ (range r) $ \i -> do
            writeArray arr i $ f (memoST r f) i
        return arr

memoMArray
    :: forall a b. (Ix a)
    => (a, a)
    -> ((a -> b) -> a -> b)
    -> a -> b
memoMArray r f = (unsafePerformIO compute !)
  where
    compute :: IO (Array a b)
    compute = do
        arr <- newArray_ r :: IO (IOArray a b)
        forM_ (range r) $ \i -> do
            writeArray arr i $ f (memoMArray r f) i
        freeze arr




memoArray
    :: forall a b. (Ix a)
    => (a, a)
    -> ((a -> b) -> a -> b)
    -> a -> b
memoArray r f = fetch
  where
    fetch n = arr ! n
    arr = listArray r $ map (f fetch) (range r)




memoST'
    :: forall a b. (Ix a)
    => (a, a)
    -> ((a -> b) -> a -> b)
    -> a -> b
memoST' r f = fetch
  where
    fetch n =  arr ! n

    arr = runSTArray compute

    compute :: ST s (STArray s a b)
    compute = do
        a <- newArray_ r
        forM_ (range r) $ \i -> do
            writeArray a i $ f fetch i
        return a

memoMArray'
    :: forall a b. (Ix a)
    => (a, a)
    -> ((a -> b) -> a -> b)
    -> a -> b
memoMArray' r f = fetch
  where
    fetch n = arr ! n

    arr = unsafePerformIO compute

    compute :: IO (Array a b)
    compute = do
        a <- newArray_ r :: IO (IOArray a b)
        forM_ (range r) $ \i -> do
            writeArray a i $ f fetch i
        freeze a


weird :: (Int -> Int) -> Int -> Int
weird _ 0 = 0
weird _ 1 = 0
weird f i = f (i `div` 2) + f (i - 1) + 1

stweird :: Int -> Int
stweird n = memoST (0, n) weird n

marrayweird :: Int -> Int
marrayweird n = memoMArray (0, n) weird n

arrayweird :: Int -> Int
arrayweird n = memoArray (0, n) weird n

stweird' :: Int -> Int
stweird' n = memoST' (0, n) weird n

marrayweird' :: Int -> Int
marrayweird' n = memoMArray' (0, n) weird n

main :: IO()
main = do
    let rounds = 5000
    print $ stweird rounds
    print $ marrayweird rounds
    print $ arrayweird rounds
    print $ stweird' rounds
    print $ marrayweird' rounds
    putStrLn ""

    defaultMain
       [ bench "stweird" $ whnf stweird rounds
       , bench "marrayweird" $ whnf marrayweird rounds
       , bench "arrayweird" $ whnf arrayweird rounds
       , bench "stweird'" $ whnf stweird' rounds
       , bench "marrayweird'" $ whnf marrayweird' rounds
       ]