在 haskell 中将 Pipes 组合成循环或循环
Composing Pipes into a loop or cycle in haskell
这个问题是关于 Haskell 库 Pipes。
此问题与 2019 年有关Advent of Code Day 11(可能剧透警告)
我有两个 Pipe Int Int m r
brain
和 robot
需要在连续循环中相互传递信息。即 brain
的输出需要转到 robot
的输入,而 robot
的输出需要转到 brain
的输入。当 brain
完成后,我需要计算结果。
如何将 brain
和 robot
组合成一个循环?理想情况下,我可以将 Effect m r
类型的循环传递给 runEffect
编辑:结果应如下所示:
+-----------+ +-----------+
| | | |
| | | |
a ==> f ==> b ==> g ==> a=|
^ | | | | |
| | | | | | | |
| +-----|-----+ +-----|-----+ |
| v v |
| () r |
+=====================================+
您可以通过将管道的输出连接到输入来制作循环管道。
cyclic :: Functor m => Producer a m r
cyclic = cyclic >-> f >-> g
考虑以下示例:
import Pipes
import qualified Pipes.Prelude as P
f :: Monad m => Pipe Int Int m r
f = P.map (* 2)
g :: Monad m => Int -> Pipe Int Int m Int
g 0 = return 100
g n = do x <- await ; yield (x + 1) ; g (n - 1)
因为这里的 f
和 g
在等待之前都不产生任何输出,使用 cyclic = cyclic >-> f >-> g
将导致 f
永远等待。避免这种情况的关键是确保 f
或 g
在等待之前产生一些东西,或者像这样将初始输入提供给第一个管道:
cyclic' :: Monad m => Int -> Producer Int m Int
cyclic' input = let pipe = (yield input >> pipe) >-> f >-> g 6 in pipe
这里 运行 runEffect (cyclic' 0 >-> P.print)
给出 return 100
并打印 1 3 7 15 31 63
.
P.S。 (可能是 Advent of Code 2019 剧透)你可以使用同样的方案来完成第 7 天。如果你的 Intcode 计算机有类型 StateT IntcodeState (Pipe Int Int m)
,那么你可以使用 replicate 5 (evalState runIntcode initialIntcodeState)
来获得 5 个管道对应于 5 个中的每一个放大器。
答案
最简单的解决方案是使用 Client
和 Server
作为 danidiaz 在评论中建议的那样,因为 pipes
没有任何内置的循环管道支持,它会非常困难,如果不是不可能正确地做到这一点。这主要是因为我们需要处理 await
的数量与 yield
的数量不匹配的情况。
编辑: 我添加了有关其他答案问题的部分。请参阅 "Another problematic alternative"
部分
编辑 2: 我在下面添加了一个问题较少的可能解决方案。请参阅 "A possible solution"
部分
一个有问题的选择
但是可以在 Proxy
framework (with Client
and Server
) and the neat function generalize
的帮助下模拟它,它将单向 Pipe
变成双向 Proxy
。
generalize f x0
+-----------+ +---------------------+
| | | |
| | x <======================== x
a ==> f ==> b becomes | |
| | a ==> f ==> b
| | | | |
+-----|-----+ +----------|----------+
v v
r r
loop :: Monad m => Pipe a a m r -> a -> Effect m r
loop p x0 = pure >\ generalize p x0 //> pure
有这个形状
loop f
a
+-----|-----+
| | |
/====<=======/===<========\
| | | |
\=> a ==> f ==> a ==/
| |
+-----|-----+
v
r
如您所见,我们需要为a
输入一个初始值。这是因为无法保证管道在屈服之前不会 await
,这将迫使它永远等待。
但是请注意,如果在 await
ing 之前管道 yield
s 多次,此 将丢弃数据 ,因为泛化是在内部使用状态实现的在 yield 时保存最后一个值并在等待时检索最后一个值的 monad。
用法(有问题的想法)
要将它与您的管道一起使用,只需将它们组合起来并交给 loop
:
runEffect $ loop (f >-> g)
但是请不要使用它,因为如果你不小心它会随机丢弃数据
另一个有问题的选择
你也可以像 mingminrr 建议的那样做一个懒惰的无限管道链
infiniteChain :: Functor m => Pipe a a m r -> Producer a m r
infiniteChain f = infiniteChain >-> f
这解决了 discarded/duplicated 值的问题,但还有其他几个问题。首先是在屈服之前先等待会导致无限循环和无限内存使用,但这已经在 mingmingrr 的回答中解决了。
另一个更难解决的问题是,对应的 yield 之前的每个 action 对于每个 await 都会重复一次。如果我们修改他们的示例以记录正在发生的事情,我们可以看到这一点:
import Pipes
import qualified Pipes.Prelude as P
f :: Monad m => Pipe Int Int m r
f = P.map (* 2)
g :: Monad m => Int -> Pipe Int Int m ()
g 0 = return ()
g n = do
lift . putStrLn $ "Awaiting. n = " ++ show n
x <- await
lift . putStrLn $ "Got: x = " ++ show x ++ " and n = "++ show n ;
yield (x + 1)
g (n - 1)
cyclic' :: Monad m => Int -> Producer Int m Int
cyclic' input = let pipe = (yield input >> pipe) >-> f >-> g 6 in pipe
现在,运行ning runEffect (cyclic' 0 >-> P.print)
将打印以下内容:
Awaiting. n = 6
Got: x = 0 and n = 6
1
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
7
Awaiting. n = 3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
Got: x = 14 and n = 3
15
Awaiting. n = 2
Awaiting. n = 3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
Got: x = 14 and n = 3
Got: x = 30 and n = 2
31
Awaiting. n = 1
Awaiting. n = 2
Awaiting. n = 3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
Got: x = 14 and n = 3
Got: x = 30 and n = 2
Got: x = 62 and n = 1
63
如您所见,对于每个 await
,我们都重新执行所有操作,直到相应的 yield
。更具体地说,await 触发到 运行 的管道的新副本,直到它达到产量。当我们再次 await 时,副本将 运行 直到下一次再次 yield,如果在此期间触发 await
,它将创建另一个副本并 运行 它直到第一次 yield,并且等等。
这意味着在最好的情况下,我们得到 O(n^2)
而不是线性性能(并且使用 O(n)
而不是 O(1)
内存),因为我们为每个动作重复所有内容.在最坏的情况下,例如如果我们正在读取或写入文件,我们可能会得到完全错误的结果,因为我们正在重复副作用。
可能的解决方案
如果你真的必须使用 Pipe
s 而不能使用 request
/respond
并且你确定你的代码永远不会 await
超过(或之前)它 yield
s(或者在这些情况下有一个很好的默认值),我们可以在我之前的尝试的基础上建立一个解决方案,至少可以处理 yield
ing 更多的情况比你 await
.
诀窍是为 generalize
的实现添加一个缓冲区,因此多余的值会被存储而不是被丢弃。我们还可以将额外的参数保留为缓冲区为空时的默认值。
import Pipes.Lift (evalStateP)
import Control.Monad.Trans.State.Strict (state, modify)
import qualified Data.Sequence
generalize' :: Monad m => Pipe a b m r -> x -> Proxy x a x b m r
generalize' p x0 = evalStateP Seq.empty $ up >\ hoist lift p //> dn
where
up () = do
x <- lift $ state (takeHeadDef x0)
request x
dn a = do
x <- respond a
lift $ modify (Seq.|> x)
takeHeadDef :: a -> Seq.Seq a -> (a, Seq.Seq a)
takeHeadDef x0 xs = (foldr const x0 xs, Seq.drop 1 xs)
如果我们现在将其插入 loop
的定义中,我们将解决丢弃多余值的问题(以保留缓冲区的内存成本为代价)。它还可以防止复制默认值以外的任何值,并且仅在缓冲区为空时才使用默认值。
loop' :: Monad m => a -> Pipe a a m r -> Effect m r
loop' x0 p = pure >\ generalize' p x0 //> pure
如果我们希望 await
ing 在 yield
ing 之前是一个错误,我们可以简单地给 error
作为我们的默认值:loop' (error "Await without yield") somePipe
.
TL;DR
使用 Pipes.Core
中的 Client
和 Server
。它将解决您的问题,并且不会引起大量奇怪的错误。
如果这不可能,我的 "Possible solution" 部分和 generalize
的修改版本在大多数情况下应该可以完成工作。
这个问题是关于 Haskell 库 Pipes。
此问题与 2019 年有关Advent of Code Day 11(可能剧透警告)
我有两个 Pipe Int Int m r
brain
和 robot
需要在连续循环中相互传递信息。即 brain
的输出需要转到 robot
的输入,而 robot
的输出需要转到 brain
的输入。当 brain
完成后,我需要计算结果。
如何将 brain
和 robot
组合成一个循环?理想情况下,我可以将 Effect m r
类型的循环传递给 runEffect
编辑:结果应如下所示:
+-----------+ +-----------+
| | | |
| | | |
a ==> f ==> b ==> g ==> a=|
^ | | | | |
| | | | | | | |
| +-----|-----+ +-----|-----+ |
| v v |
| () r |
+=====================================+
您可以通过将管道的输出连接到输入来制作循环管道。
cyclic :: Functor m => Producer a m r
cyclic = cyclic >-> f >-> g
考虑以下示例:
import Pipes
import qualified Pipes.Prelude as P
f :: Monad m => Pipe Int Int m r
f = P.map (* 2)
g :: Monad m => Int -> Pipe Int Int m Int
g 0 = return 100
g n = do x <- await ; yield (x + 1) ; g (n - 1)
因为这里的 f
和 g
在等待之前都不产生任何输出,使用 cyclic = cyclic >-> f >-> g
将导致 f
永远等待。避免这种情况的关键是确保 f
或 g
在等待之前产生一些东西,或者像这样将初始输入提供给第一个管道:
cyclic' :: Monad m => Int -> Producer Int m Int
cyclic' input = let pipe = (yield input >> pipe) >-> f >-> g 6 in pipe
这里 运行 runEffect (cyclic' 0 >-> P.print)
给出 return 100
并打印 1 3 7 15 31 63
.
P.S。 (可能是 Advent of Code 2019 剧透)你可以使用同样的方案来完成第 7 天。如果你的 Intcode 计算机有类型 StateT IntcodeState (Pipe Int Int m)
,那么你可以使用 replicate 5 (evalState runIntcode initialIntcodeState)
来获得 5 个管道对应于 5 个中的每一个放大器。
答案
最简单的解决方案是使用 Client
和 Server
作为 danidiaz 在评论中建议的那样,因为 pipes
没有任何内置的循环管道支持,它会非常困难,如果不是不可能正确地做到这一点。这主要是因为我们需要处理 await
的数量与 yield
的数量不匹配的情况。
编辑: 我添加了有关其他答案问题的部分。请参阅 "Another problematic alternative"
部分编辑 2: 我在下面添加了一个问题较少的可能解决方案。请参阅 "A possible solution"
部分一个有问题的选择
但是可以在 Proxy
framework (with Client
and Server
) and the neat function generalize
的帮助下模拟它,它将单向 Pipe
变成双向 Proxy
。
generalize f x0
+-----------+ +---------------------+
| | | |
| | x <======================== x
a ==> f ==> b becomes | |
| | a ==> f ==> b
| | | | |
+-----|-----+ +----------|----------+
v v
r r
loop :: Monad m => Pipe a a m r -> a -> Effect m r
loop p x0 = pure >\ generalize p x0 //> pure
有这个形状
loop f
a
+-----|-----+
| | |
/====<=======/===<========\
| | | |
\=> a ==> f ==> a ==/
| |
+-----|-----+
v
r
如您所见,我们需要为a
输入一个初始值。这是因为无法保证管道在屈服之前不会 await
,这将迫使它永远等待。
但是请注意,如果在 await
ing 之前管道 yield
s 多次,此 将丢弃数据 ,因为泛化是在内部使用状态实现的在 yield 时保存最后一个值并在等待时检索最后一个值的 monad。
用法(有问题的想法)
要将它与您的管道一起使用,只需将它们组合起来并交给 loop
:
runEffect $ loop (f >-> g)
但是请不要使用它,因为如果你不小心它会随机丢弃数据
另一个有问题的选择
你也可以像 mingminrr 建议的那样做一个懒惰的无限管道链
infiniteChain :: Functor m => Pipe a a m r -> Producer a m r
infiniteChain f = infiniteChain >-> f
这解决了 discarded/duplicated 值的问题,但还有其他几个问题。首先是在屈服之前先等待会导致无限循环和无限内存使用,但这已经在 mingmingrr 的回答中解决了。
另一个更难解决的问题是,对应的 yield 之前的每个 action 对于每个 await 都会重复一次。如果我们修改他们的示例以记录正在发生的事情,我们可以看到这一点:
import Pipes
import qualified Pipes.Prelude as P
f :: Monad m => Pipe Int Int m r
f = P.map (* 2)
g :: Monad m => Int -> Pipe Int Int m ()
g 0 = return ()
g n = do
lift . putStrLn $ "Awaiting. n = " ++ show n
x <- await
lift . putStrLn $ "Got: x = " ++ show x ++ " and n = "++ show n ;
yield (x + 1)
g (n - 1)
cyclic' :: Monad m => Int -> Producer Int m Int
cyclic' input = let pipe = (yield input >> pipe) >-> f >-> g 6 in pipe
现在,运行ning runEffect (cyclic' 0 >-> P.print)
将打印以下内容:
Awaiting. n = 6
Got: x = 0 and n = 6
1
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
7
Awaiting. n = 3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
Got: x = 14 and n = 3
15
Awaiting. n = 2
Awaiting. n = 3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
Got: x = 14 and n = 3
Got: x = 30 and n = 2
31
Awaiting. n = 1
Awaiting. n = 2
Awaiting. n = 3
Awaiting. n = 4
Awaiting. n = 5
Awaiting. n = 6
Got: x = 0 and n = 6
Got: x = 2 and n = 5
Got: x = 6 and n = 4
Got: x = 14 and n = 3
Got: x = 30 and n = 2
Got: x = 62 and n = 1
63
如您所见,对于每个 await
,我们都重新执行所有操作,直到相应的 yield
。更具体地说,await 触发到 运行 的管道的新副本,直到它达到产量。当我们再次 await 时,副本将 运行 直到下一次再次 yield,如果在此期间触发 await
,它将创建另一个副本并 运行 它直到第一次 yield,并且等等。
这意味着在最好的情况下,我们得到 O(n^2)
而不是线性性能(并且使用 O(n)
而不是 O(1)
内存),因为我们为每个动作重复所有内容.在最坏的情况下,例如如果我们正在读取或写入文件,我们可能会得到完全错误的结果,因为我们正在重复副作用。
可能的解决方案
如果你真的必须使用 Pipe
s 而不能使用 request
/respond
并且你确定你的代码永远不会 await
超过(或之前)它 yield
s(或者在这些情况下有一个很好的默认值),我们可以在我之前的尝试的基础上建立一个解决方案,至少可以处理 yield
ing 更多的情况比你 await
.
诀窍是为 generalize
的实现添加一个缓冲区,因此多余的值会被存储而不是被丢弃。我们还可以将额外的参数保留为缓冲区为空时的默认值。
import Pipes.Lift (evalStateP)
import Control.Monad.Trans.State.Strict (state, modify)
import qualified Data.Sequence
generalize' :: Monad m => Pipe a b m r -> x -> Proxy x a x b m r
generalize' p x0 = evalStateP Seq.empty $ up >\ hoist lift p //> dn
where
up () = do
x <- lift $ state (takeHeadDef x0)
request x
dn a = do
x <- respond a
lift $ modify (Seq.|> x)
takeHeadDef :: a -> Seq.Seq a -> (a, Seq.Seq a)
takeHeadDef x0 xs = (foldr const x0 xs, Seq.drop 1 xs)
如果我们现在将其插入 loop
的定义中,我们将解决丢弃多余值的问题(以保留缓冲区的内存成本为代价)。它还可以防止复制默认值以外的任何值,并且仅在缓冲区为空时才使用默认值。
loop' :: Monad m => a -> Pipe a a m r -> Effect m r
loop' x0 p = pure >\ generalize' p x0 //> pure
如果我们希望 await
ing 在 yield
ing 之前是一个错误,我们可以简单地给 error
作为我们的默认值:loop' (error "Await without yield") somePipe
.
TL;DR
使用 Pipes.Core
中的 Client
和 Server
。它将解决您的问题,并且不会引起大量奇怪的错误。
如果这不可能,我的 "Possible solution" 部分和 generalize
的修改版本在大多数情况下应该可以完成工作。