具有并发限制的并行(Aff)执行?
Parallel (Aff) execution with concurrent limit?
在Aff方面并发进程限制下实现并行执行的方式是什么?我相信标准库中没有方法,也没有找到一个很好的完整答案。
parSequenceWithLmit :: Array (Aff X) -> Int -> Aff (Array X)
Aff X
计算应该并行进行,但不能超过给定的 N 个并发计算。所以它开始N个cals,当一个完成时,下一个(左边的)开始。
对于这种事情,一个好的机制是 AVar
,它是一个阻塞的可变单元格。它在概念上可以被认为是一个单元素阻塞队列。
第一个,一个AVar
可能是空的也可能是满的。您可以使用 empty
, and then you can "fill" it with a value using put
创建一个空的。这里有用的一点是,当你调用 put
并且 AVar
已经“满”时,put
将 阻塞 直到它再次为空。
Second,您可以使用 take
读取值,这将 return 您的值,但将 AVar
留空同一时间。与 put
类似,如果 AVar
为空,take
将阻塞直到填满。
所以你可以用它做以下事情:
- 创建单个
AVar
.
- 分叉 N 个进程,每个进程都会
take
来自 AVar
的值并处理它,然后循环。永远。
- 有一个协调进程,它将遍历整个工作序列和
put
个工作项到 AVar
。
当所有工作进程都忙时,orchestrator 进程会向 AVar
中推送另一个值,然后会尝试推送下一个,但此时会阻塞,因为 AVar
已经满了。它将保持阻塞状态,直到其中一个工作进程完成其工作并调用 take
以获取下一个工作项,使 AVar
为空。这将解除阻塞 orchestrator 进程,它会立即将下一个工作项推送到 AVar
,依此类推。
这里缺少的一点是如何停止。如果工作进程只是无限循环,它们将永远不会退出。当协调器进程最终用完工作并停止填充 AVar
时,工作进程将永远阻塞在 take
调用上。不好。
所以要解决这个问题,有两种工作项 - (1) 实际工作和 (2) 停止处理的命令。然后让 orchestrator 进程首先推送所有工作项,一旦完成,推送 N 个命令停止。您可以选择推送 N+1 个命令来停止:这将保证编排程序进程阻塞,直到最后一个工作人员完成。
将所有这些放在一起,这是一个演示程序:
module Main where
import Prelude
import Data.Array ((..))
import Data.Foldable (for_)
import Data.Int (toNumber)
import Effect (Effect)
import Effect.AVar (AVar)
import Effect.Aff (Aff, Milliseconds(..), delay, forkAff, launchAff_)
import Effect.Aff.AVar as AVar
import Effect.Class (liftEffect)
import Effect.Console (log)
data Work a = Work a | Done
process :: Int -> AVar (Work Int) -> Aff Unit
process myIndex v = do
w <- AVar.take v
case w of
Done ->
pure unit
Work i -> do
liftEffect $ log $ "Worker " <> show myIndex <> ": Processing " <> show i
delay $ Milliseconds $ toNumber i
liftEffect $ log $ "Worker " <> show myIndex <> ": Processed " <> show i
process myIndex v
main :: Effect Unit
main = launchAff_ do
var <- AVar.empty
for_ (1..5) \idx -> forkAff $ process idx var
let inputs = [100,200,300,300,400,1000,2000,101,102,103,104]
for_ inputs \i -> AVar.put (Work i) var
for_ (1..6) \_ -> AVar.put Done var
在这个程序中,我的工作项只是数字,表示睡眠的毫秒数。我将其用作每个工作项处理“昂贵”程度的模型。程序输出将是这样的:
Worker 1: Processing 100
Worker 2: Processing 200
Worker 3: Processing 300
Worker 4: Processing 300
Worker 5: Processing 400
Worker 1: Processed 100
Worker 1: Processing 1000
Worker 2: Processed 200
Worker 2: Processing 2000
Worker 3: Processed 300
Worker 3: Processing 101
Worker 4: Processed 300
Worker 4: Processing 102
Worker 5: Processed 400
Worker 5: Processing 103
Worker 3: Processed 101
Worker 3: Processing 104
Worker 4: Processed 102
Worker 5: Processed 103
Worker 3: Processed 104
Worker 1: Processed 1000
Worker 2: Processed 2000
在Aff方面并发进程限制下实现并行执行的方式是什么?我相信标准库中没有方法,也没有找到一个很好的完整答案。
parSequenceWithLmit :: Array (Aff X) -> Int -> Aff (Array X)
Aff X
计算应该并行进行,但不能超过给定的 N 个并发计算。所以它开始N个cals,当一个完成时,下一个(左边的)开始。
对于这种事情,一个好的机制是 AVar
,它是一个阻塞的可变单元格。它在概念上可以被认为是一个单元素阻塞队列。
第一个,一个AVar
可能是空的也可能是满的。您可以使用 empty
, and then you can "fill" it with a value using put
创建一个空的。这里有用的一点是,当你调用 put
并且 AVar
已经“满”时,put
将 阻塞 直到它再次为空。
Second,您可以使用 take
读取值,这将 return 您的值,但将 AVar
留空同一时间。与 put
类似,如果 AVar
为空,take
将阻塞直到填满。
所以你可以用它做以下事情:
- 创建单个
AVar
. - 分叉 N 个进程,每个进程都会
take
来自AVar
的值并处理它,然后循环。永远。 - 有一个协调进程,它将遍历整个工作序列和
put
个工作项到AVar
。
当所有工作进程都忙时,orchestrator 进程会向 AVar
中推送另一个值,然后会尝试推送下一个,但此时会阻塞,因为 AVar
已经满了。它将保持阻塞状态,直到其中一个工作进程完成其工作并调用 take
以获取下一个工作项,使 AVar
为空。这将解除阻塞 orchestrator 进程,它会立即将下一个工作项推送到 AVar
,依此类推。
这里缺少的一点是如何停止。如果工作进程只是无限循环,它们将永远不会退出。当协调器进程最终用完工作并停止填充 AVar
时,工作进程将永远阻塞在 take
调用上。不好。
所以要解决这个问题,有两种工作项 - (1) 实际工作和 (2) 停止处理的命令。然后让 orchestrator 进程首先推送所有工作项,一旦完成,推送 N 个命令停止。您可以选择推送 N+1 个命令来停止:这将保证编排程序进程阻塞,直到最后一个工作人员完成。
将所有这些放在一起,这是一个演示程序:
module Main where
import Prelude
import Data.Array ((..))
import Data.Foldable (for_)
import Data.Int (toNumber)
import Effect (Effect)
import Effect.AVar (AVar)
import Effect.Aff (Aff, Milliseconds(..), delay, forkAff, launchAff_)
import Effect.Aff.AVar as AVar
import Effect.Class (liftEffect)
import Effect.Console (log)
data Work a = Work a | Done
process :: Int -> AVar (Work Int) -> Aff Unit
process myIndex v = do
w <- AVar.take v
case w of
Done ->
pure unit
Work i -> do
liftEffect $ log $ "Worker " <> show myIndex <> ": Processing " <> show i
delay $ Milliseconds $ toNumber i
liftEffect $ log $ "Worker " <> show myIndex <> ": Processed " <> show i
process myIndex v
main :: Effect Unit
main = launchAff_ do
var <- AVar.empty
for_ (1..5) \idx -> forkAff $ process idx var
let inputs = [100,200,300,300,400,1000,2000,101,102,103,104]
for_ inputs \i -> AVar.put (Work i) var
for_ (1..6) \_ -> AVar.put Done var
在这个程序中,我的工作项只是数字,表示睡眠的毫秒数。我将其用作每个工作项处理“昂贵”程度的模型。程序输出将是这样的:
Worker 1: Processing 100
Worker 2: Processing 200
Worker 3: Processing 300
Worker 4: Processing 300
Worker 5: Processing 400
Worker 1: Processed 100
Worker 1: Processing 1000
Worker 2: Processed 200
Worker 2: Processing 2000
Worker 3: Processed 300
Worker 3: Processing 101
Worker 4: Processed 300
Worker 4: Processing 102
Worker 5: Processed 400
Worker 5: Processing 103
Worker 3: Processed 101
Worker 3: Processing 104
Worker 4: Processed 102
Worker 5: Processed 103
Worker 3: Processed 104
Worker 1: Processed 1000
Worker 2: Processed 2000