使用严格的单子操作无限列表

Operating infinite lists with strict monads

我有一个函数 f :: [a] -> b 可以在无限列表上运行(例如 take 5takeWhile (< 100) . scanl (+) 0 等等)。我想为这个函数提供由严格的单子动作生成的值(例如 randomIO)。

this question,我了解到 repeatsequence 技巧方法不适用于严格的单子,如下例所示:

import Control.Monad.Identity

take 5 <$> sequence (repeat $ return 1) :: Identity [Int]
-- returns `Identity [1,1,1,1,1]`
-- works because Identity is non-strict

take 5 <$> sequence (repeat $ return 1) :: IO [Int]
 -- returns `*** Exception: stack overflow`
 -- does not work because IO is strict

所以,相反,我考虑使用函数"inside" monadic 上下文。我受到这个 circular programming example 的启发并尝试了:

let loop = do
       x <- return 1
       (_, xs) <- loop
       return (take 5 xs, x:xs)
in  fst loop :: Identity [Int]
-- Overflows the stack

import Control.Monad.Fix

fst <$> mfix (\(_, xs) -> do
   x <- return 1
   return (take 5 xs, x:xs)) :: Identity [Int]
-- Overflows the stack

甚至

{-# LANGUAGE RecursiveDo #-}
import System.Random

loop' = mdo
   (xs', xs) <- loop xs
   return xs'
where loop xs = do
      x <- randomIO
      return (take 5 xs, x:xs)

print $ loop'
-- Returns a list of 5 identical values

但是 none 这些作品。我还尝试了一种 Conduit 方法,即使在 Identity 情况下也不起作用:

import Conduit

runConduitPure $ yieldMany [1..] .| sinkList >>= return . take 5

所以我想知道:

  1. 为什么上述 "circular" 方法中的 none 有效?

  2. 如果存在不涉及unsafeInterleaveIO的解决方案。 (也许 iteratee 秒,Arrow 秒?)

I am using randomIO here just for simplicity of the examples. In practice, I would like to process messages received via sockets

如果没有 unsafeInterleaveIO 是不可能的。归根结底,问题是必须对 IO 操作进行排序。虽然引用透明值的评估顺序并不重要,但 IO 操作的顺序可能。如果你想要一个通过套接字接收到的所有消息的惰性无限列表,你必须事先通知 Haskell 在 IO 操作序列中适合的位置(除非你使用 unsafeInterleaveIO) .

您正在寻找的 Arrow 抽象被称为 ArrowLoop,但对于严格的 monads,它也存在 右紧律 的问题。

乍一看,它可能看起来像 MonadFix (manifested via mdo or mfix) is a solution too, but digging a bit deeper shows that fixIO has problems,就像 ArrowLoop 中的 loop

不过,有时候限制IO个动作必须运行一个接一个,有点过分了,这就是unsafeInterleaveIO的用意。引用 docs

unsafeInterleaveIO allows IO computation to be deferred lazily. When passed a value of type IO a, the IO will only be performed when the value of the a is demanded.


现在,即使您明确表示您想要一个unsafeInterleaveIO解决方案,因为我希望能够说服您这是可行的方法,这里是:

interweavingRepeatM :: IO a -> IO [a]
interweavingRepeatM action = unsafeInterleaveIO ((:) <$> action <*> interweavingRepeatM action)

这里是为随机数工作的:

ghci> import System.Random
ghci> sourceOfRandomness <- interweavingRepeatM randomIO :: IO [Integer]
ghci> take 10 sourceOfRandomness
[-2002742716261662204,7803971943047671004,-8395318556488893887,-7372674153585794391,5906750628663631621,6428130029392850107,6453903217221537923,-8966011929671667536,6419977320189968675,-1842456468700051776]

涵盖了您的一般问题。以下是关于 conduit 和类似的流媒体库的具体内容。

I also tried a Conduit approach which did not work either even in the Identity case:

import Conduit

runConduitPure $ yieldMany [1..] .| sinkList >>= return . take 5

虽然通常使用流媒体库来避免您提到的那种困难(参见 Pipes.Tutorial), they assume you will use their stream types instead of lists. Consider, for instance, how sinkList 的开场白由 Conduit 文档描述:

Consume all values from the stream and return as a list. Note that this will pull all values into memory.

这意味着在 yieldMany 之后立即使用 sinkMany 将您带回到原点:将所有值存入内存正是 sequenceIO 组合的原因和无限列表不可用。相反,您应该做的是使用流媒体库的基础设施来构建管道的各个阶段。这里有一些简单的例子,主要使用 conduitconduit-combinators:

中的 ready-made 东西
GHCi> import Conduit
GHCi> runConduitPure $ yieldMany [1..] .| takeC 5 .| sinkList
[1,2,3,4,5]
GHCi> runConduit $ yieldMany [1..] .| takeC 5 .| printC -- try it without takeC
1
2
3
4
5
GHCi> runConduit $ yieldMany [1..] .| takeC 5 .| scanlC (+) 0 .| printC
0
1
3
6
10
15
GHCi> :{
GHCi| runConduit $ yieldMany [1..] .| takeC 5
GHCi|     .| awaitForever (\x -> liftIO (print (2*x)) >> yield x)
GHCi|     .| printC
GHCi| :}
2
1
4
2
6
3
8
4
10
5
GHCi> runConduit $ (sourceRandom :: Producer IO Int) .| takeC 5 .| printC 
1652736016140975126
5518223062916052424
-1236337270682979278
8079753510915129274
-609160753105692151

(感谢 Michael 让我注意到 sourceRandom -- 起初我用 repeatMC randomIO 推出了自己的产品。)