如何使 Pipe 与 Haskell 的 Pipe 库并发?

How can I make a Pipe concurrent with Haskell's Pipe library?

我有一些 Haskell 使用管道的代码:

module Main(main) where
import Pipes

a :: Producer Int IO ()
a = each [1..10]

b :: Pipe Int Int IO ()
b = do
  x <- await
  yield (x*2)
  b

c :: Consumer Int IO ()
c = do
  x <- await
  lift $ print x
  c

main :: IO ()
main = runEffect $ a >-> b >-> c

Pipes.Concurrent tutorial 演示了使用多个 worker 以及工作窃取。我怎样才能在 b 中做类似的事情?我希望 b 使用一定数量的工人同时执行它的工作。

显然,并发在这种情况下没有用,但这是我能想到的最简单的示例。在我的实际用例中,我想使用有限数量的工作人员同时发出一些 Web 请求。

编辑:我误解了你的意思; 您也许可以在管道内执行此操作,但我不确定动机是什么。我建议构建可重复使用的管道链,并使用 worker 向他们派遣,而不是尝试在管道内构建 worker。如果您将它构建到管道本身,您将失去任何先进先出的顺序保证。

Work Stealing 上的部分就是您要查找的内容,这段代码基本上是教程中的逐字记录,但让我们分解一下它的工作原理。这是我们可以为所欲为的一种方式:

module Main(main) where
import Pipes
import Pipes.Concurrent

import Control.Concurrent.Async (async, wait)
import Control.Concurrent (threadDelay)
import Control.Monad (forM)

a :: Producer Int IO ()
a = each [1..10]

b :: Pipe Int Int IO ()
b = do
  x <- await
  yield (x*2)
  b

c :: Consumer Int IO ()
c = do
  x <- await
  lift $ print x
  c

main :: IO ()
main = do
  (output, input) <- spawn unbounded
  feeder <- async $ do runEffect $ a >-> toOutput output
                       performGC

  workers <- forM [1..3] $ \i ->
    async $ do runEffect $ fromInput input  >-> b >-> c
               performGC

  mapM_ wait (feeder:workers)

第一行spawn unbounded来自Pipes.Concurrent,它初始化了一个'mailbox',它有一个输入和输出句柄。一开始我很困惑,但在这种情况下,我们将消息发送到输出并从输入中提取它们。这类似于 golang 等语言中的推拉消息通道。

我们指定一个 Buffer 来表示我们可以存储多少条消息,在这种情况下我们将 no-limit 设置为 unbounded。

好的,邮箱已经初始化,我们现在可以创建 Effects 来向它发送消息。邮箱通道是使用 STM 实现的,因此它可以异步收集消息。

让我们创建一个为邮箱提供数据的异步作业;

feeder <- async $ do runEffect $ a >-> toOutput output
                     performGC

a >-> toOutput output 只是正常的管道组合,我们需要 toOutput 将输出转换为管道。请注意 performGC 调用也是 IO 的一部分,它允许 Pipes.Concurrent 知道在作业完成后进行清理。如果愿意,我们可以使用 forkIO 运行,但在这种情况下,我们使用 async 以便我们可以等待结果稍后完成。好的,那么我们的邮箱应该是异步接收消息,让我们把它们拉出来做一些工作。

workers <- forM [1..3] $ \i ->
  async $ do runEffect $ fromInput input  >-> b >-> c
             performGC

与以前相同的想法,但这次我们只是产生了其中的一些。我们像使用 fromInput 的普通管道一样从输入中读取输入,然后 运行 通过我们链的其余部分读取它,完成后进行清理。 input 将确保每次拉出一个值时只有一个工人接收它。当所有送入 output 的作业完成时(它跟踪所有打开的作业)然后它将关闭 input 管道并且工作人员将完成。

如果您在 web-worker 场景中使用它,您将有一个主循环,它不断向 toOutput output 通道发送请求,然后生成任意数量的工作人员,他们将拉入他们的管道来自 fromInput input.