在 haskell 中将 Pipes 组合成循环或循环

Composing Pipes into a loop or cycle in haskell

这个问题是关于 Haskell 库 Pipes

此问题与 2019 年有关Advent of Code Day 11(可能剧透警告)

我有两个 Pipe Int Int m r brainrobot 需要在连续循环中相互传递信息。即 brain 的输出需要转到 robot 的输入,而 robot 的输出需要转到 brain 的输入。当 brain 完成后,我需要计算结果。

如何将 brainrobot 组合成一个循环?理想情况下,我可以将 Effect m r 类型的循环传递给 runEffect

编辑:结果应如下所示:

   +-----------+     +-----------+   
   |           |     |           |   
   |           |     |           |   
a ==>    f    ==> b ==>    g    ==> a=|
^  |           |     |           |    |
|  |     |     |     |     |     |    |
|  +-----|-----+     +-----|-----+    |
|        v                 v          |
|        ()                r          |
+=====================================+

您可以通过将管道的输出连接到输入来制作循环管道。

cyclic :: Functor m => Producer a m r
cyclic = cyclic >-> f >-> g

考虑以下示例:

import Pipes
import qualified Pipes.Prelude as P

f :: Monad m => Pipe Int Int m r
f = P.map (* 2)

g :: Monad m => Int -> Pipe Int Int m Int
g 0 = return 100
g n = do x <- await ; yield (x + 1) ; g (n - 1)

因为这里的 fg 在等待之前都不产生任何输出,使用 cyclic = cyclic >-> f >-> g 将导致 f 永远等待。避免这种情况的关键是确保 fg 在等待之前产生一些东西,或者像这样将初始输入提供给第一个管道:

cyclic' :: Monad m => Int -> Producer Int m Int
cyclic' input = let pipe = (yield input >> pipe) >-> f >-> g 6 in pipe

这里 运行 runEffect (cyclic' 0 >-> P.print) 给出 return 100 并打印 1 3 7 15 31 63.

P.S。 (可能是 Advent of Code 2019 剧透)你可以使用同样的方案来完成第 7 天。如果你的 Intcode 计算机有类型 StateT IntcodeState (Pipe Int Int m),那么你可以使用 replicate 5 (evalState runIntcode initialIntcodeState) 来获得 5 个管道对应于 5 个中的每一个放大器。

答案

最简单的解决方案是使用 ClientServer 作为 danidiaz 在评论中建议的那样,因为 pipes 没有任何内置的循环管道支持,它会非常困难,如果不是不可能正确地做到这一点。这主要是因为我们需要处理 await 的数量与 yield 的数量不匹配的情况。

编辑: 我添加了有关其他答案问题的部分。请参阅 "Another problematic alternative"

部分

编辑 2: 我在下面添加了一个问题较少的可能解决方案。请参阅 "A possible solution"

部分

一个有问题的选择

但是可以在 Proxy framework (with Client and Server) and the neat function generalize 的帮助下模拟它,它将单向 Pipe 变成双向 Proxy

                                       generalize f x0
   +-----------+                   +---------------------+
   |           |                   |                     |
   |           |                x <======================== x
a ==>    f    ==> b   becomes      |                     |
   |           |                a ==>         f         ==> b
   |     |     |                   |                     |
   +-----|-----+                   +----------|----------+
         v                                    v     
         r                                    r     

现在我们可以使用//> and >\来堵住两端并使流循环:

loop :: Monad m => Pipe a a m r -> a -> Effect m r
loop p x0 = pure >\ generalize p x0 //> pure

有这个形状

            loop f

              a 
        +-----|-----+
        |     |     |
 /====<=======/===<========\
 |      |           |      |
 \=> a ==>    f    ==> a ==/
        |           |
        +-----|-----+
              v    
              r    

如您所见,我们需要为a输入一个初始值。这是因为无法保证管道在屈服之前不会 await,这将迫使它永远等待。

但是请注意,如果在 awaiting 之前管道 yields 多次,此 将丢弃数据 ,因为泛化是在内部使用状态实现的在 yield 时保存最后一个值并在等待时检索最后一个值的 monad。

用法(有问题的想法)

要将它与您的管道一起使用,只需将它们组合起来并交给 loop:

runEffect $ loop (f >-> g)

但是请不要使用它,因为如果你不小心它会随机丢弃数据

另一个有问题的选择

你也可以像 mingminrr 建议的那样做一个懒惰的无限管道链

infiniteChain :: Functor m => Pipe a a m r -> Producer a m r
infiniteChain f = infiniteChain >-> f

这解决了 discarded/duplicated 值的问题,但还有其他几个问题。首先是在屈服之前先等待会导致无限循环和无限内存使用,但这已经在 mingmingrr 的回答中解决了。

另一个更难解决的问题是,对应的 yield 之前的每个 action 对于每个 await 都会重复一次。如果我们修改他们的示例以记录正在发生的事情,我们可以看到这一点:

import Pipes
import qualified Pipes.Prelude as P

f :: Monad m => Pipe Int Int m r
f = P.map (* 2)

g :: Monad m => Int -> Pipe Int Int m ()
g 0 = return ()
g n = do
  lift . putStrLn $ "Awaiting. n = " ++ show n
  x <- await
  lift . putStrLn $ "Got: x = " ++ show x ++ " and n = "++ show n ;
  yield (x + 1)
  g (n - 1)

cyclic' :: Monad m => Int -> Producer Int m Int
cyclic' input = let pipe = (yield input >> pipe) >-> f >-> g 6 in pipe

现在,运行ning runEffect (cyclic' 0 >-> P.print) 将打印以下内容:

Awaiting. n = 6
Got: x = 0 and n = 6
1
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
7
Awaiting. n = 3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
Got: x = 14 and n = 3
15
Awaiting. n = 2
Awaiting. n = 3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
Got: x = 14 and n = 3
Got: x = 30 and n = 2
31
Awaiting. n = 1
Awaiting. n = 2
Awaiting. n = 3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
Got: x = 14 and n = 3
Got: x = 30 and n = 2
Got: x = 62 and n = 1
63

如您所见,对于每个 await,我们都重新执行所有操作,直到相应的 yield。更具体地说,await 触发到 运行 的管道的新副本,直到它达到产量。当我们再次 await 时,副本将 运行 直到下一次再次 yield,如果在此期间触发 await,它将创建另一个副本并 运行 它直到第一次 yield,并且等等。

这意味着在最好的情况下,我们得到 O(n^2) 而不是线性性能(并且使用 O(n) 而不是 O(1) 内存),因为我们为每个动作重复所有内容.在最坏的情况下,例如如果我们正在读取或写入文件,我们可能会得到完全错误的结果,因为我们正在重复副作用。

可能的解决方案

如果你真的必须使用 Pipes 而不能使用 request/respond 并且你确定你的代码永远不会 await 超过(或之前)它 yields(或者在这些情况下有一个很好的默认值),我们可以在我之前的尝试的基础上建立一个解决方案,至少可以处理 yielding 更多的情况比你 await.

诀窍是为 generalize 的实现添加一个缓冲区,因此多余的值会被存储而不是被丢弃。我们还可以将额外的参数保留为缓冲区为空时的默认值。

import Pipes.Lift (evalStateP)
import Control.Monad.Trans.State.Strict (state, modify)
import qualified Data.Sequence

generalize' :: Monad m => Pipe a b m r -> x -> Proxy x a x b m r
generalize' p x0 = evalStateP Seq.empty $ up >\ hoist lift p //> dn
  where
    up () = do
        x <- lift $ state (takeHeadDef x0)
        request x
    dn a = do
        x <- respond a
        lift $ modify (Seq.|> x)
    takeHeadDef :: a -> Seq.Seq a -> (a, Seq.Seq a)
    takeHeadDef x0 xs = (foldr const x0 xs, Seq.drop 1 xs)

如果我们现在将其插入 loop 的定义中,我们将解决丢弃多余值的问题(以保留缓冲区的内存成本为代价)。它还可以防止复制默认值以外的任何值,并且仅在缓冲区为空时才使用默认值。

loop' :: Monad m => a -> Pipe a a m r -> Effect m r
loop' x0 p = pure >\ generalize' p x0 //> pure

如果我们希望 awaiting 在 yielding 之前是一个错误,我们可以简单地给 error 作为我们的默认值:loop' (error "Await without yield") somePipe.

TL;DR

使用 Pipes.Core 中的 ClientServer。它将解决您的问题,并且不会引起大量奇怪的错误。

如果这不可能,我的 "Possible solution" 部分和 generalize 的修改版本在大多数情况下应该可以完成工作。