Haskell: 我可以直接将整数读入数组吗?

Haskell: Can I read integers directly into an array?

In this programming problem, the input is an n×m integer matrix. Typically, n≈ 105 and m ≈ 10. The official solution(1606D,教程)非常必要:它涉及一些矩阵操作、预计算和聚合。为了好玩,我把它当作一个 STUArray 实现练习。

问题

我已经设法使用 STUArray 实现了它,但程序仍然比允许的 way more memory (256MB)。即使在本地 运行 时,最大驻留集大小也是 >400 MB。在分析中,从 stdin 读取似乎占主导地位的内存占用:

函数 readvreadv.readInt 负责解析整数并将它们保存到二维列表中,占用大约 50-70 MB,而不是大约 16 MB = (106 个整数) × (每个整数 8 个字节 + 每个 link 个 8 字节).

我是否有希望总内存低于 256 MB?我已经在使用 Text 包作为输入。也许我应该完全避免使用列表,直接从标准输入读取 整数 到数组。我们该怎么做? 或者,问题出在其他地方吗?

代码

{-# OPTIONS_GHC -O2 #-}
module CF1606D where
import qualified Data.Text as T
import qualified Data.Text.IO as TI
import qualified Data.Text.Read as TR
import Control.Monad
import qualified Data.List as DL
import qualified Data.IntSet as DS
import Control.Monad.ST
import Data.Array.ST.Safe
import Data.Int (Int32)
import Data.Array.Unboxed

solve :: IO ()
solve =  do
  ~[n,m] <- readv      
  -- 2D list
  input <- {-# SCC input #-} replicateM (fromIntegral n) readv     
  let
      ints = [1..]
      sorted = DL.sortOn (head.fst) (zip input ints)
      (rows,indices) = {-# SCC rows_inds #-} unzip sorted    
      -- 2D list converted into matrix:
      matrix = mat (fromIntegral n) (fromIntegral m) rows           
      infinite = 10^7
      asc x y = [x,x+1..y]
      desc x y = [y,y-1..x]    
      -- Four prefix-matrices:
      tlMax = runSTUArray $ prefixMat max 0 asc asc (subtract 1) (subtract 1) =<< matrix
      blMin = runSTUArray $ prefixMat min infinite desc asc (+1) (subtract 1) =<< matrix
      trMin = runSTUArray $ prefixMat min infinite asc desc (subtract 1) (+1) =<< matrix
      brMax = runSTUArray $ prefixMat max 0 desc desc (+1) (+1) =<< matrix    
      good _ (i,j)
        | tlMax!(i,j) < blMin!(i+1,j) && brMax!(i+1,j+1) < trMin!(i,j+1) = Left (i,j)
        | otherwise = Right ()
      {-# INLINABLE good #-}
      nearAns = foldM good () [(i,j)|i<-[1..n-1],j<-[1..m-1]]
      ans = either (\(i,j)-> "YES\n" ++ color n (take i indices) ++ " " ++ show j) (const "NO") nearAns
  putStrLn ans

type I = Int32
type S s = (STUArray s (Int, Int) I)
type R = Int -> Int -> [Int]
type F = Int -> Int

mat :: Int -> Int -> [[I]] -> ST s (S s)
mat n m rows = newListArray ((1,1),(n,m)) $ concat rows

prefixMat :: (I->I->I) -> I -> R -> R -> F -> F -> S s -> ST s (S s)
prefixMat opt worst ordi ordj previ prevj mat = do
  ((ilo,jlo),(ihi,jhi)) <- getBounds mat
  pre <- newArray ((ilo-1,jlo-1),(ihi+1,jhi+1)) worst
  forM_ (ordi ilo ihi) $ \i-> do
    forM_ (ordj jlo jhi) $ \j -> do
      matij <- readArray mat (i,j)
      prei <- readArray pre (previ i,j)
      prej <- readArray pre (i, prevj j)
      writeArray pre (i,j) (opt (opt prei prej) matij)
  return pre

color :: Int -> [Int] -> String
color n inds = let
  temp = DS.fromList inds
  colors = [if DS.member i temp then 'B' else 'R' | i<-[1..n]]
  in colors

readv :: Integral t => IO [t]
readv = map readInt . T.words <$> TI.getLine where
  readInt = fromIntegral . either (const 0) fst . TR.signed TR.decimal
{-# INLINABLE readv #-}

main :: IO ()
main = do
  ~[n] <- readv
  replicateM_ n solve

上面代码的简要说明:

  1. 读取 n 行,每行有 m 个整数。
  2. 按第一个元素对行进行排序。
  3. 现在计算四个 'prefix matrices',每个角一个。对于左上角和右下角,它是前缀最大值,对于其他两个角,它是我们需要计算的前缀最小值。
  4. 找到这些前缀矩阵满足以下条件的单元格 [i,j]:top_left [i,j] < bottom_left [i,j] 和 top_right [i,j] > bottom_right [i,j]
  5. 对于第 1 行到第 i 行,将它们的原始索引(即在未排序的输入矩阵中的位置)标记为蓝色。将其余标记为红色。

示例输入和命令

示例输入:inp3.txt.

命令:

> stack ghc -- -main-is CF1606D.main -with-rtsopts="-s -h -p -P" -rtsopts -prof -fprof-auto CF1606D
> gtime -v ./CF1606D < inp3.txt > outp
    ...
    ...
    MUT     time    2.990s  (  3.744s elapsed)    #    RTS -s output
    GC      time    4.525s  (  6.231s elapsed)    #    RTS -s output
    ...
    ...
    Maximum resident set size (kbytes): 408532    #    >256 MB (gtime output)

> stack exec -- hp2ps -t0.1 -e8in -c CF1606D.hp && open CF1606D.ps

关于 GC 的问题: 如上面 +RTS -s 输出所示,GC 似乎比实际逻辑执行花费的时间更长。这是正常的吗?有没有办法可视化 GC activity 随着时间的推移?我尝试使矩阵严格,但没有任何影响。

可能这根本不是一个功能友好的问题(尽管我很乐意被反驳)。例如,Java 也使用 GC,但有很多成功的 Java 提交。 尽管如此,我还是想看看我能走多远。谢谢!

与普遍看法相反,Haskell 对此类问题非常友好。真正的问题是 GHC 附带的 array 库完全是垃圾。另一个大问题是 Haskell 教导每个人在应该使用数组的地方使用列表,这通常是缓慢代码和内存膨胀程序的主要来源之一。因此,GC 花费很长时间也就不足为奇了,这是因为分配的东西太多了。这是下面提供的解决方案的提供输入的 运行:

   1,483,547,096 bytes allocated in the heap
         566,448 bytes copied during GC
      18,703,640 bytes maximum residency (3 sample(s))
       1,223,400 bytes maximum slop
              32 MiB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0      1399 colls,     0 par    0.009s   0.009s     0.0000s    0.0011s
  Gen  1         3 colls,     0 par    0.002s   0.002s     0.0006s    0.0016s

  TASKS: 4 (1 bound, 3 peak workers (3 total), using -N1)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.001s  (  0.001s elapsed)
  MUT     time    0.484s  (  0.517s elapsed)
  GC      time    0.011s  (  0.011s elapsed)
  EXIT    time    0.001s  (  0.002s elapsed)
  Total   time    0.496s  (  0.530s elapsed)

下面提供的解决方案使用数组库massiv,导致无法提交给codeforces。但是,希望目标是在 Haskell 上变得更好,而不是在某些网站上获得积分。

红蓝矩阵可以分为两个阶段:读取求解

阅读

阅读尺寸

main 函数中,我们只读取数组的总数和每个数组的维度。我们也打印结果。这里没有什么令人兴奋的。 (请注意,链接文件 inp3.txt 的数组大于问题中定义的限制:n*m <= 10^6

import Control.Monad.ST
import Control.Monad
import qualified Data.ByteString as BS
import Data.Massiv.Array as A hiding (B)
import Data.Massiv.Array.Mutable.Algorithms (quicksortByM_)
import Control.Scheduler (trivialScheduler_)

main :: IO ()
main = do
  t <- Prelude.read <$> getLine
  when (t < 1 || t > 1000) $ error $ "Invalid t: " ++ show t
  replicateM_ t $ do
    dimsStr <- getLine
    case Prelude.map Prelude.read (words dimsStr) of
      -- Test file fails this check: && n * m <= 10 ^ (6 :: Int) -> do
      [n, m] | n >= 2 && m > 0 && m <= 5 * 10 ^ (5 :: Int) -> do
        mat <- readMatrix n m
        case solve mat of
          Nothing -> putStrLn "NO"
          Just (ix, cs) -> do
            putStrLn "YES"
            putStr $ foldMap show cs
            putStr " "
            print ix
      _ -> putStrLn $ "Unexpected dimensions: " ++ show dimsStr

读取

中的数组

将输入加载到数组中是原始问题的主要问题来源:

  • 不需要依赖text,ascii字符是该问题预期的唯一有效输入。
  • 输入被读入列表列表。该列表列表是内存开销的真正来源。
  • 列表排序速度慢得离谱,而且占用大量内存。

通常在这种情况下,使用 conduit 之类的方式以流方式读取输入会好得多。特别是,将输入读取为字节流并将这些字节解析为数字将是最佳解决方案。话虽如此,在问题描述中对每个数组的宽度都有严格的要求,因此我们可以将输入逐行读取为 ByteString 然后解析数字(为简单起见假设为无符号)在每一行中,同时将这些数字写入数组。这确保在这个阶段我们只会分配结果数组和一行作为字节序列。这可以使用像 attoparsec 这样的解析库来完成,但问题很简单,只需临时执行即可。

type Val = Word

readMatrix :: Int -> Int -> IO (Matrix P Val)
readMatrix n m = createArrayS_ (Sz2 n m) readMMatrix

readMMatrix :: MMatrix RealWorld P Val -> IO ()
readMMatrix mat =
  loopM_ 0 (< n) (+ 1) $ \i -> do
    line <- BS.getLine
    --- ^ reads at most 10Mb because it is known that input will be at most
    -- 5*10^5 Words: 19 digits max per Word and one for space: 5*10^5 * 20bytes
    loopM 0 (< m) (+ 1) line $ \j bs ->
      let (word, bs') = parseWord bs
       in bs' <$ write_ mat (i :. j) word
  where
    Sz2 n m = sizeOfMArray mat
    isSpace = (== 32)
    isDigit w8 = w8 >= 48 && w8 <= 57
    parseWord bs =
      case BS.uncons bs of
        Just (w8, bs')
          | isDigit w8 -> parseWordLoop (fromIntegral (w8 - 48)) bs'
          | otherwise -> error $ "Unexpected byte: " ++ show w8
        Nothing -> error "Unexpected end of input"
    parseWordLoop !acc bs =
      case BS.uncons bs of
        Nothing -> (acc, bs)
        Just (w8, bs')
          | isSpace w8 -> (acc, bs')
          | isDigit w8 -> parseWordLoop (acc * 10 + fromIntegral (w8 - 48)) bs'
          | otherwise -> error $ "Unexpected byte: " ++ show w8

解决

这是我们实施实际解决方案的步骤。我没有尝试修复此 SO 问题中提供的解决方案,而是继续翻译了问题中链接的 C++ solution。我走那条路的原因是双重的:

  • C++ 解决方案是高度命令式的,我想证明命令式数组操作对 Haskell 并不陌生,所以我尝试创建一个尽可能接近的翻译。
  • 我知道这个解决方案有效

请注意,应该可以用 array 包重写下面的解决方案,因为最后只需要 readwriteallocate 操作。

computeSortBy ::
     (Load r Ix1 e, Manifest r' e)
  => (e -> e -> Ordering)
  -> Vector r e
  -> Vector r' e
computeSortBy f vec = 
  withLoadMArrayST_ vec $ quicksortByM_ (\x y -> pure $ f x y) trivialScheduler_

solve :: Matrix P Val -> Maybe (Int, [Color])
solve a = runST $ do
  let sz@(Sz2 n m) = size a
      ord :: Vector P Int
      ord = computeSortBy 
              (\x y -> compare (a ! (y :. 0)) (a ! (x :. 0))) (0 ..: n)
  mxl <- newMArray @P sz minBound
  loopM_ (n - 1) (>= 0) (subtract 1) $ \ i ->
    loopM_ 0 (< m) (+ 1) $ \j -> do
      writeM mxl (i :. j) (a ! ((ord ! i) :. j))
      when (i < n - 1) $
        writeM mxl (i :. j) 
          =<< max <$> readM mxl (i :. j) <*> readM mxl (i + 1 :. j)
      when (j > 0) $
        writeM mxl (i :. j) 
          =<< max <$> readM mxl (i :. j) <*> readM mxl (i :. j - 1)
  mnr <- newMArray @P sz maxBound
  loopM_ (n - 1) (>= 0) (subtract 1) $ \ i ->
    loopM_ (m - 1) (>= 0) (subtract 1) $ \ j -> do
      writeM mnr (i :. j) (a ! ((ord ! i) :. j))
      when (i < n - 1) $
        writeM mnr (i :. j) 
          =<< min <$> readM mnr (i :. j) <*> readM mnr (i + 1 :. j)
      when (j < m - 1) $
        writeM mnr (i :. j) 
          =<< min <$> readM mnr (i :. j) <*> readM mnr (i :. j + 1)
  mnl <- newMArray @P (Sz m) maxBound
  mxr <- newMArray @P (Sz m) minBound
  let goI i
        | i < n - 1 = do
          loopM_ 0 (< m) (+ 1) $ \j -> do
            val <- min (a ! ((ord ! i) :. j)) <$> readM mnl j
            writeM mnl j val
            when (j > 0) $
              writeM mnl j . min val =<< readM mnl (j - 1)
          loopM_ (m - 1) (>= 0) (subtract 1) $ \j -> do
            val <- max (a ! ((ord ! i) :. j)) <$> readM mxr j
            writeM mxr j val
            when (j < m - 1) $
              writeM mxr j . max val =<< readM mxr (j + 1)
          let goJ j
                | j < m - 1 = do
                    mnlVal <- readM mnl j
                    mxlVal <- readM mxl (i + 1 :. j)
                    mxrVal <- readM mxr (j + 1)
                    mnrVal <- readM mnr ((i + 1) :. (j + 1))
                    if mnlVal > mxlVal && mxrVal < mnrVal
                      then pure $ Just (i, j)
                      else goJ (j + 1)
                | otherwise = pure Nothing
          goJ 0 >>= \case
            Nothing -> goI (i + 1)
            Just pair -> pure $ Just pair
        | otherwise = pure Nothing
  mAns <- goI 0
  Control.Monad.forM mAns $ \ (ansFirst, ansSecond) -> do
    resVec <- createArrayS_ @BL (Sz n) $ \res ->
        iforM_ ord $ \i ordIx -> do
          writeM res ordIx $! if i <= ansFirst then R else B
    pure (ansSecond + 1, A.toList resVec)