向下游发出上游已耗尽的信号

Signalling to downstream that upstream is exhausted

问题

将 Haskell pipes library, I'm trying to define a Pipe 与以下类型一起使用:

signalExhausted :: Monad m => Pipe a (Value a) m r 

其中 Value 数据类型定义为:

data Value a = Value a | Exhausted

管道应遵循以下规律:

toList (each [] >-> signalExhausted) ==                 [Exhausted]
toList (each xs >-> signalExhausted) == map Value xs ++ [Exhausted]

换句话说,管道应该等同于 Pipes.Prelude.map Value,除了它应该在处理完所有上游值后产生一个额外的 Exhausted,让下游有机会执行一些最终操作.

这样的Pipe可以定义吗?

示例

> let xs = words "hubble bubble toil and trouble"
> toList $ each xs >-> signalExhausted
[Value "hubble", Value "bubble", Value "toil", Value "and", Value "trouble", Exhausted]

备注

我知道 pipes-parse 库提供函数 drawparseForever。这些看起来很有用,但我不太明白如何将它们组合成符合上述规范的 Pipe

不能定义像signalExhausted这样的管道,但是可以定义相当于(>-> signalExhausted)的函数。

>-> is a specialized version of the pull category. Execution is driven by the downstream proxies pulling data from upstream proxies. The downstream proxy sends an empty request () upstream and blocks until a response holding a value comes back from the upstream proxy. When the upstream proxy is exhausted and doesn't have any more values to send back, it returns. You can see the return that matters for these examples in the definition of each.

each = F.foldr (\a p -> yield a >> p) (return ())
-- what to do when the data's exhausted ^                       

下游代理需要一个值才能继续运行,但是管道库可能无法提供任何值,因此下游代理永远不会再次运行。由于它再也不会运行,因此无法修改数据或对数据做出反应。

这个问题有两种解决方法。最简单的是 map Value over the upstream pipe 并在完成后添加一个 yield Exhausted.

import Pipes
import qualified Pipes.Prelude as P

data Value a = Value a | Exhausted
    deriving (Show)

signalExhausted p = p >-> P.map Value >> yield Exhausted

除了函数 signalExhausted 取代了 (>-> signalExhausted).

之外,这完全符合您的要求
let xs = words "hubble bubble toil and trouble"
print . P.toList . signalExhausted $ each xs

[Value "hubble",Value "bubble",Value "toil",Value "and",Value "trouble",Exhausted]

此问题更通用的解决方案是停止上游代理 returning 并在其耗尽时向下游发出信号。我在 .

中演示了如何执行此操作
import Control.Monad
import Pipes.Core

returnDownstream :: Monad m => Proxy a' a b' b m r -> Proxy a' a b' (Either r b) m r'
returnDownstream = (forever . respond . Left =<<) . (respond . Right <\)

这会将每个 respond 替换为 respond . Right,并将 return 替换为 forever . respond . left,向下游发送 return 秒以及响应。

returnDownstream 比您要查找的内容更笼统。我们可以演示如何使用它来重新创建 signalExhaustedreturnDownstream 将 return 的管道转换为从不 return 的管道,而是将其 return 值作为 [=37] 的 Left 值向下游转发=].

signalExhausted p = returnDownstream p >-> respondLeftOnce

respondLeftOnce 是下游代理的示例。下游代理可以区分 Right 中保存的常规值和 Left.

中保存的 return 值
respondLeftOnce :: Monad m => Pipe (Either e a) (Value a) m ()
respondLeftOnce = go
    where
        go = do
            ea <- await
            case ea of
                Right a -> yield (Value a) >> go                    
                Left  _ -> yield Exhausted       -- The upstream proxy is exhausted; do something else