对于大型列表,为什么 `filterM + mapM_` 比 `mapM_ + when` 慢得多?

Why is `filterM + mapM_` so much slower than `mapM_ + when`, with large lists?

我不太了解 Haskell 优化在内部是如何工作的,但我一直在使用过滤器,非常希望它们被优化成与 C++ 中的简单 if 等效的东西。例如

mapM_ print $ filter (\n -> n `mod` 2 == 0) [0..10]

将编译成等同于

for (int i = 0; i < 10; i++)
    if (i%2 == 0)
        printf("%d\n", i);

对于长列表(10 000 000 个元素),基本 filter 似乎是正确的,但如果我使用 monadic filterM,就会有很大的不同。我为此速度测试编写了一段代码,很明显 filterM 的使用比使用 when.

的命令式方法持续时间更长(250 倍)
import Data.Array.IO
import Control.Monad
import System.CPUTime

main :: IO ()
main = do
  start <- getCPUTime
  arr <- newArray (0, 100) 0 :: IO (IOUArray Int Int)
  let
    okSimple i =
      i < 100

    ok i = do
      return $ i < 100
    -- -- of course we don't need IO for a simple i < 100
    -- -- but my goal is to ask for the contents of the array, e.g.
    -- ok i = do
    --   current <- readArray arr (i `mod` 101)
    --   return$ i `mod` 37 > current `mod` 37
    
    write :: Int -> IO ()
    write i =
      writeArray arr (i `mod` 101) i

    writeIfOkSimple :: Int -> IO ()
    writeIfOkSimple i =
      when (okSimple i) $ write i

    writeIfOk :: Int -> IO ()
    writeIfOk i =
      ok i >>= (\isOk -> when isOk $ write i)

  -------------------------------------------------------------------
  ---- these four methods have approximately same execution time ----
  ---- (but the last one is executed on 250 times shorter list)  ----
  -------------------------------------------------------------------
  -- mapM_ write$ filter okSimple [0..10000000*250] -- t = 20.694
  -- mapM_ writeIfOkSimple [0..10000000*250]        -- t = 20.698
  -- mapM_ writeIfOk [0..10000000*250]              -- t = 20.669
  filterM ok [0..10000000] >>= mapM_ write          -- t = 17.200

  -- evaluate array
  elems <- getElems arr
  print $ sum elems

  end <- getCPUTime
  print $ fromIntegral (end - start) / (10^12)

我的问题是:两种方法(使用 writeIfOk / 使用 filterM okwrite)不应该编译成相同的代码(迭代列表、询问条件、写入数据) )?如果不是,我可以做些什么(重写代码、添加编译标志、使用内联编译指示或其他东西)使它们在计算上等效,还是我应该在性能至关重要时始终使用 when

把这个问题归结为本质,你问的是

f (filter g xs)

f =<< filterM (pure . g) xs

这基本上归结为懒惰。 filter g xs 根据需要递增地生成其结果,只走 xs 足够远以找到结果的下一个元素。 filterM 定义如下:

filterM _p [] = pure []
filterM p (x : xs)
  = liftA2 (\q r -> if q then x : r else r)
           (p x)
           (filterM p xs)

由于 IO 是一个“严格”的应用程序,因此在遍历整个列表之前不会产生任何结果,在内存中累积 p x 结果。