如何将基于拉的管道变成基于推的管道?

How to turn a pull based pipe into a push based one?

默认情况下,管道是基于拉动的。这是由于运算符 >-> 是通过 +>> 实现的,这是他的拉动类别的有意义的 bind 运算符。我的理解是,这意味着如果你有像 producer >-> consumer 这样的代码,消费者的 body 将首先被调用,然后一旦它等待数据,生产者将被调用。

我在 pipes 文档 here 中看到您可以使用 Pipes.Core 中的代码 (reflect .) 将基于拉的管道转换为基于推的管道.这意味着(如果我错了请纠正我)在上面的代码 producer >-> consumer 中,生产者首先是 运行,产生一个值,然后消费者尝试消费。这看起来很有用,我想知道怎么做。

我还在讨论 here 中看到 >-> 没有基于推送的对应物,因为它很容易扭转任何管道(我假设使用 reflect?),但我可以真的想知道怎么做或找到任何例子。

这是我尝试过的一些代码:

stdin :: Producer String IO r
stdin = forever $ do
  lift $ putStrLn "stdin"
  str <- lift getLine
  yield str

countLetters :: Consumer String IO r
countLetters = forever $ do
  lift $ putStrLn "countLetters"
  str <- await
  lift . putStrLn . show . length $ str

-- this works in pull mode
runEffect (stdin >-> countLetters)

-- equivalent to above, works
runEffect ((\() -> stdin) +>> countLetters)

-- push based operator, doesn't do what I hoped
runEffect (stdin >>~ (\_ -> countLetters))

-- does not compile
runEffect (countLetters >>~ (\() -> stdin))
-- push based operator, doesn't do what I hoped
runEffect (stdin >>~ (\_ -> countLetters))

我收集到的问题是,虽然生产者如预期的那样 运行 首先,但第一个产生的值被丢弃了。比较...

GHCi> runEffect (stdin >-> countLetters)
countLetters
stdin
foo
3
countLetters
stdin
glub
4
countLetters
stdin

...与:

GHCi> runEffect (stdin >>~ (\_ -> countLetters))
stdin
foo
countLetters
stdin
glub
4
countLetters
stdin

Gabriella Gonzalez's answer to this question 详细讨论了这个问题。它归结为你给 (>>~) 的函数的参数如何是基于推送的流程中的“驱动”输入,所以如果你 const 它离开你最终放弃第一个输入。解决方案是相应地重塑 countLetters

countLettersPush :: String -> Consumer String IO r
countLettersPush str = do
  lift $ putStrLn "countLetters"
  lift . putStrLn . show . length $ str
  str' <- await
  countLettersPush str'
GHCi> runEffect (stdin >>~ countLettersPush)
stdin
foo
countLetters
3
stdin
glub
countLetters
4
stdin

I've also seen in discussions here that there is no push based counterpart to >-> because it is easy to turn any pipe around (I assume with reflect?)

我不太确定我的立场,但似乎并不完全适用于上述解决方案。我们 可以 做的是,既然我们已经让基于推送的流程正常工作,我们可以使用 reflect 将其转回基于拉取的流程:

-- Preliminary step: switching to '(>~>)'.
stdin >>~ countLettersPush
(const stdin >~> countLettersPush) ()

-- Applying 'reflect', as the documentation suggests.
reflect . (const stdin >~> countLettersPush)
reflect . const stdin <+< reflect . countLettersPush
const (reflect stdin) <+< reflect . countLettersPush

-- Rewriting in terms of '(+>>)'.
(reflect . countLettersPush >+> const (reflect stdin)) ()
reflect . countLettersPush +>> reflect stdin

这确实是基于拉动的,因为流量是由 reflect stdin 驱动的,下游 Client:

GHCi> :t reflect stdin
reflect stdin :: Proxy String () () X IO r
GHCi> :t reflect stdin :: Client String () IO r
reflect stdin :: Client String () IO r :: Client String () IO r

然而,该流程涉及向上游发送 Strings,因此不能用 (>->) 表示,也就是说,仅下游:

GHCi> -- Compare the type of the second argument with that of 'reflect stdin'
GHCi> :t (>->)
(>->)
  :: Monad m =>
     Proxy a' a () b m r -> Proxy () b c' c m r -> Proxy a' a c' c m