管道展开组合

Pipes unfolds composition

我目前正在学习管道。在玩双向管道时,我注意到展开的组合看起来非常相似:

(//>) :: Monad m => Proxy x' x b' b m r -> (b -> Proxy x' x c' c m b') -> Proxy x' x c' c m r
-- instead of:
(//>) :: Monad m => Proxy x' x b' b m r -> (b -> Proxy b' b c' c m b') -> Proxy x' x c' c m r

但是由于我们 wire up 延续的方式,我们必须共享 x' 和 x 类型:

(>>=) :: Monad m => Proxy a' a b' b m r -> (r -> Proxy a' a b' b m r') -> Proxy a' a b' b m r'
case p0 of
    Request x' fx  -> Request x' (\x -> go (fx x))
    Respond b  fb' -> fb b >>= \b' -> (fb' b')
    ...

但这很容易解决:

import Pipes
import Pipes.Core hiding ((//>))

main :: IO ()
main = runEffect $ lhs //> rhs

infixl 4 //>
(//>) :: Monad m => Proxy x' x b' b m r -> (b -> Proxy b' b c' c m b') -> Proxy x' x c' c m r
p //> f = p >>~ go
    where go x = go =<< request =<< f x

lhs :: Proxy x' x String String IO ()
lhs = each [1..10::Int] //> \i -> do
    r <- respond $ "response nr. " ++ show i
    lift . putStrLn $ "lhs: " ++ show r

rhs :: String -> Proxy String String  x x' IO String
rhs x = do
    lift . putStrLn $ "rhs 1: " ++ show x
    y <- request "yield manually to upstream!"
    lift . putStrLn $ "rhs 2: " ++ show y
    return "return to upstream"

预期输出:

rhs 1: "response nr. 1"
lhs: "yield manually to upstream!"
rhs 2: "response nr. 2"
lhs: "return to upstream"
rhs 1: "response nr. 3"
lhs: "yield manually to upstream!"
rhs 2: "response nr. 4"
lhs: "return to upstream"
rhs 1: "response nr. 5"
lhs: "yield manually to upstream!"
rhs 2: "response nr. 6"
lhs: "return to upstream"
rhs 1: "response nr. 7"
lhs: "yield manually to upstream!"
rhs 2: "response nr. 8"
lhs: "return to upstream"
rhs 1: "response nr. 9"
lhs: "yield manually to upstream!"
rhs 2: "response nr. 10"
lhs: "return to upstream"

据我所知,这也没有违反任何法律。

所以最后我的问题是:为什么 Pipes 使用当前定义?

我认为 (//>)'s contract 的相关部分是...

(p //> f) replaces each respond in p with f.

... 这意味着 f 将以相同的方式处理从 p 收到的所有值。然而,这正是您的组合器所规避的——在您的示例中,您在遍历 each [1..10] 的元素时在消息集之间交替。为了进一步说明这一点,这里是你的代码的一个稍微修改的版本(特别是,我为你的组合器选择了一个不同的名称,并在 each [1..10] 之后立即使用普通的旧 (//>),因为你的组合器的行为在那种情况下相同):

infixl 4 //>*
(//>*) :: Monad m =>
    Proxy x' x b' b m r -> (b -> Proxy b' b c' c m b') -> Proxy x' x c' c m r
p //>* f = p >>~ go
    where go x = f x >>= request >>= go

src :: Monad m => Producer Int m ()
src = each [1..10]

-- The types of lhs and rhs are more restrictive than yours, but for this
-- usage pattern (and with the adjustments I made) that is not a problem. 
lhs :: Show a => a -> Server String String IO ()
lhs = \i -> do
    r <- respond $ "response nr. " ++ show i
    lift . putStrLn $ "lhs: " ++ r

rhs :: String -> Client String String IO String
rhs x = do
    lift . putStrLn $ "rhs 0: Will this happen for every value?"
    lift . putStrLn $ "rhs 1: " ++ x
    y <- request "yield manually to upstream!"
    lift . putStrLn $ "rhs 2: " ++ y
    return "return to upstream"

我在 rhs 开头插入的问题的答案...

GHCi> runEffect $ (src //> lhs) //>* rhs
rhs 0: Will this happen for every value?
rhs 1: response nr. 1
lhs: yield manually to upstream!
rhs 2: response nr. 2
lhs: return to upstream
rhs 0: Will this happen for every value?
rhs 1: response nr. 3
lhs: yield manually to upstream!
rhs 2: response nr. 4
lhs: return to upstream
rhs 0: Will this happen for every value?
rhs 1: response nr. 5
lhs: yield manually to upstream!
rhs 2: response nr. 6
lhs: return to upstream
rhs 0: Will this happen for every value?
rhs 1: response nr. 7
lhs: yield manually to upstream!
rhs 2: response nr. 8
lhs: return to upstream
rhs 0: Will this happen for every value?
rhs 1: response nr. 9
lhs: yield manually to upstream!
rhs 2: response nr. 10
lhs: return to upstream

... 不是。将其与我使用 (//>) 作为最外层组合器连接您的函数时发生的情况进行对比,如下所示:

GHCi> runEffect $ src //> (\x -> lhs x //>* rhs)
rhs 0: Will this happen for every value?
rhs 1: response nr. 1
lhs: yield manually to upstream!
rhs 0: Will this happen for every value?
rhs 1: response nr. 2
lhs: yield manually to upstream!
rhs 0: Will this happen for every value?
rhs 1: response nr. 3
lhs: yield manually to upstream!
rhs 0: Will this happen for every value?
rhs 1: response nr. 4
lhs: yield manually to upstream!
rhs 0: Will this happen for every value?
rhs 1: response nr. 5
lhs: yield manually to upstream!
rhs 0: Will this happen for every value?
rhs 1: response nr. 6
lhs: yield manually to upstream!
rhs 0: Will this happen for every value?
rhs 1: response nr. 7
lhs: yield manually to upstream!
rhs 0: Will this happen for every value?
rhs 1: response nr. 8
lhs: yield manually to upstream!
rhs 0: Will this happen for every value?
rhs 1: response nr. 9
lhs: yield manually to upstream!
rhs 0: Will this happen for every value?
rhs 1: response nr. 10
lhs: yield manually to upstream!

这里不是设置最多给出十个响应的服务器 (src //> lhs),而是每个值都会产生一个单一响应服务器,其响应由 rhs 客户端处理。鉴于没有从服务器获得第二个响应,request 之后的 rhs 中的代码永远不会是 运行。因此,来自 src 的值被统一处理。为了进一步强调这一点,请注意,使用您的组合器来执行此操作是不必要的:src //> (lhs >~> void . rhs) 做同样的事情。

(另一件需要注意的事情是,如果我们将 lhsrhs 的类型改回最初的类型,我们可以将上面的管道写成 src //>* (\x -> lhs x //>* rhs)。但是,这与 (src //>* lhs) //>* rhs 不同。那是结合律失败,因此您的组合器不会产生类别。)

这也有助于阐明将组合器替换为 (>>~) 的原因(我确定您已在测试中尝试过):

GHCi> runEffect $ (src //> lhs) >>~ void . rhs
rhs 0: Will this happen for every value?
rhs 1: response nr. 1
lhs: yield manually to upstream!
rhs 2: response nr. 2

src //> lhs 最多提供十个回复;然而,rhs 只发出了两个请求,因此其他八个响应未被使用。对我来说,这表明您的组合器最好表达为一种让客户无限期地进行请求的方式:

-- requestForever :: Monad m => (b -> Client b' b m b') -> b -> Client b' b m r
requestForever :: Monad m => 
    (b -> Proxy b' b c' c m b') -> b -> Proxy b' b c' c m r
requestForever f = go
    where go x = f x >>= request >>= go
GHCi> runEffect $ (src //> lhs) >>~ requestForever rhs
rhs 0: Will this happen for every value?
rhs 1: response nr. 1
lhs: yield manually to upstream!
rhs 2: response nr. 2
lhs: return to upstream
rhs 0: Will this happen for every value?
rhs 1: response nr. 3
lhs: yield manually to upstream!
rhs 2: response nr. 4
lhs: return to upstream
rhs 0: Will this happen for every value?
rhs 1: response nr. 5
lhs: yield manually to upstream!
rhs 2: response nr. 6
lhs: return to upstream
rhs 0: Will this happen for every value?
rhs 1: response nr. 7
lhs: yield manually to upstream!
rhs 2: response nr. 8
lhs: return to upstream
rhs 0: Will this happen for every value?
rhs 1: response nr. 9
lhs: yield manually to upstream!
rhs 2: response nr. 10
lhs: return to upstream