如何使 Pipe 与 Haskell 的 Pipe 库并发?
How can I make a Pipe concurrent with Haskell's Pipe library?
我有一些 Haskell 使用管道的代码:
module Main(main) where
import Pipes
a :: Producer Int IO ()
a = each [1..10]
b :: Pipe Int Int IO ()
b = do
x <- await
yield (x*2)
b
c :: Consumer Int IO ()
c = do
x <- await
lift $ print x
c
main :: IO ()
main = runEffect $ a >-> b >-> c
Pipes.Concurrent tutorial 演示了使用多个 worker 以及工作窃取。我怎样才能在 b
中做类似的事情?我希望 b
使用一定数量的工人同时执行它的工作。
显然,并发在这种情况下没有用,但这是我能想到的最简单的示例。在我的实际用例中,我想使用有限数量的工作人员同时发出一些 Web 请求。
编辑:我误解了你的意思;
您也许可以在管道内执行此操作,但我不确定动机是什么。我建议构建可重复使用的管道链,并使用 worker 向他们派遣,而不是尝试在管道内构建 worker。如果您将它构建到管道本身,您将失去任何先进先出的顺序保证。
Work Stealing 上的部分就是您要查找的内容,这段代码基本上是教程中的逐字记录,但让我们分解一下它的工作原理。这是我们可以为所欲为的一种方式:
module Main(main) where
import Pipes
import Pipes.Concurrent
import Control.Concurrent.Async (async, wait)
import Control.Concurrent (threadDelay)
import Control.Monad (forM)
a :: Producer Int IO ()
a = each [1..10]
b :: Pipe Int Int IO ()
b = do
x <- await
yield (x*2)
b
c :: Consumer Int IO ()
c = do
x <- await
lift $ print x
c
main :: IO ()
main = do
(output, input) <- spawn unbounded
feeder <- async $ do runEffect $ a >-> toOutput output
performGC
workers <- forM [1..3] $ \i ->
async $ do runEffect $ fromInput input >-> b >-> c
performGC
mapM_ wait (feeder:workers)
第一行spawn unbounded
来自Pipes.Concurrent,它初始化了一个'mailbox',它有一个输入和输出句柄。一开始我很困惑,但在这种情况下,我们将消息发送到输出并从输入中提取它们。这类似于 golang 等语言中的推拉消息通道。
我们指定一个 Buffer 来表示我们可以存储多少条消息,在这种情况下我们将 no-limit 设置为 unbounded。
好的,邮箱已经初始化,我们现在可以创建 Effect
s 来向它发送消息。邮箱通道是使用 STM 实现的,因此它可以异步收集消息。
让我们创建一个为邮箱提供数据的异步作业;
feeder <- async $ do runEffect $ a >-> toOutput output
performGC
a >-> toOutput output
只是正常的管道组合,我们需要 toOutput
将输出转换为管道。请注意 performGC
调用也是 IO 的一部分,它允许 Pipes.Concurrent 知道在作业完成后进行清理。如果愿意,我们可以使用 forkIO
运行,但在这种情况下,我们使用 async
以便我们可以等待结果稍后完成。好的,那么我们的邮箱应该是异步接收消息,让我们把它们拉出来做一些工作。
workers <- forM [1..3] $ \i ->
async $ do runEffect $ fromInput input >-> b >-> c
performGC
与以前相同的想法,但这次我们只是产生了其中的一些。我们像使用 fromInput
的普通管道一样从输入中读取输入,然后 运行 通过我们链的其余部分读取它,完成后进行清理。 input
将确保每次拉出一个值时只有一个工人接收它。当所有送入 output
的作业完成时(它跟踪所有打开的作业)然后它将关闭 input
管道并且工作人员将完成。
如果您在 web-worker 场景中使用它,您将有一个主循环,它不断向 toOutput output
通道发送请求,然后生成任意数量的工作人员,他们将拉入他们的管道来自 fromInput input
.
我有一些 Haskell 使用管道的代码:
module Main(main) where
import Pipes
a :: Producer Int IO ()
a = each [1..10]
b :: Pipe Int Int IO ()
b = do
x <- await
yield (x*2)
b
c :: Consumer Int IO ()
c = do
x <- await
lift $ print x
c
main :: IO ()
main = runEffect $ a >-> b >-> c
Pipes.Concurrent tutorial 演示了使用多个 worker 以及工作窃取。我怎样才能在 b
中做类似的事情?我希望 b
使用一定数量的工人同时执行它的工作。
显然,并发在这种情况下没有用,但这是我能想到的最简单的示例。在我的实际用例中,我想使用有限数量的工作人员同时发出一些 Web 请求。
编辑:我误解了你的意思; 您也许可以在管道内执行此操作,但我不确定动机是什么。我建议构建可重复使用的管道链,并使用 worker 向他们派遣,而不是尝试在管道内构建 worker。如果您将它构建到管道本身,您将失去任何先进先出的顺序保证。
Work Stealing 上的部分就是您要查找的内容,这段代码基本上是教程中的逐字记录,但让我们分解一下它的工作原理。这是我们可以为所欲为的一种方式:
module Main(main) where
import Pipes
import Pipes.Concurrent
import Control.Concurrent.Async (async, wait)
import Control.Concurrent (threadDelay)
import Control.Monad (forM)
a :: Producer Int IO ()
a = each [1..10]
b :: Pipe Int Int IO ()
b = do
x <- await
yield (x*2)
b
c :: Consumer Int IO ()
c = do
x <- await
lift $ print x
c
main :: IO ()
main = do
(output, input) <- spawn unbounded
feeder <- async $ do runEffect $ a >-> toOutput output
performGC
workers <- forM [1..3] $ \i ->
async $ do runEffect $ fromInput input >-> b >-> c
performGC
mapM_ wait (feeder:workers)
第一行spawn unbounded
来自Pipes.Concurrent,它初始化了一个'mailbox',它有一个输入和输出句柄。一开始我很困惑,但在这种情况下,我们将消息发送到输出并从输入中提取它们。这类似于 golang 等语言中的推拉消息通道。
我们指定一个 Buffer 来表示我们可以存储多少条消息,在这种情况下我们将 no-limit 设置为 unbounded。
好的,邮箱已经初始化,我们现在可以创建 Effect
s 来向它发送消息。邮箱通道是使用 STM 实现的,因此它可以异步收集消息。
让我们创建一个为邮箱提供数据的异步作业;
feeder <- async $ do runEffect $ a >-> toOutput output
performGC
a >-> toOutput output
只是正常的管道组合,我们需要 toOutput
将输出转换为管道。请注意 performGC
调用也是 IO 的一部分,它允许 Pipes.Concurrent 知道在作业完成后进行清理。如果愿意,我们可以使用 forkIO
运行,但在这种情况下,我们使用 async
以便我们可以等待结果稍后完成。好的,那么我们的邮箱应该是异步接收消息,让我们把它们拉出来做一些工作。
workers <- forM [1..3] $ \i ->
async $ do runEffect $ fromInput input >-> b >-> c
performGC
与以前相同的想法,但这次我们只是产生了其中的一些。我们像使用 fromInput
的普通管道一样从输入中读取输入,然后 运行 通过我们链的其余部分读取它,完成后进行清理。 input
将确保每次拉出一个值时只有一个工人接收它。当所有送入 output
的作业完成时(它跟踪所有打开的作业)然后它将关闭 input
管道并且工作人员将完成。
如果您在 web-worker 场景中使用它,您将有一个主循环,它不断向 toOutput output
通道发送请求,然后生成任意数量的工作人员,他们将拉入他们的管道来自 fromInput input
.