具有并发限制的并行(Aff)执行?

Parallel (Aff) execution with concurrent limit?

在Aff方面并发进程限制下实现并行执行的方式是什么?我相信标准库中没有方法,也没有找到一个很好的完整答案。

parSequenceWithLmit :: Array (Aff X) -> Int -> Aff (Array X)

Aff X 计算应该并行进行,但不能超过给定的 N 个并发计算。所以它开始N个cals,当一个完成时,下一个(左边的)开始。

对于这种事情,一个好的机制是 AVar,它是一个阻塞的可变单元格。它在概念上可以被认为是一个单元素阻塞队列。

第一个,一个AVar可能是空的也可能是满的。您可以使用 empty, and then you can "fill" it with a value using put 创建一个空的。这里有用的一点是,当你调用 put 并且 AVar 已经“满”时,put 阻塞 直到它再次为空。

Second,您可以使用 take 读取值,这将 return 您的值,但将 AVar 留空同一时间。与 put 类似,如果 AVar 为空,take 将阻塞直到填满。

所以你可以用它做以下事情:

  1. 创建单个 AVar.
  2. 分叉 N 个进程,每个进程都会 take 来自 AVar 的值并处理它,然后循环。永远。
  3. 有一个协调进程,它将遍历整个工作序列和 put 个工作项到 AVar

当所有工作进程都忙时,orchestrator 进程会向 AVar 中推送另一个值,然后会尝试推送下一个,但此时会阻塞,因为 AVar 已经满了。它将保持阻塞状态,直到其中一个工作进程完成其工作并调用 take 以获取下一个工作项,使 AVar 为空。这将解除阻塞 orchestrator 进程,它会立即将下一个工作项推送到 AVar,依此类推。

这里缺少的一点是如何停止。如果工作进程只是无限循环,它们将永远不会退出。当协调器进程最终用完工作并停止填充 AVar 时,工作进程将永远阻塞在 take 调用上。不好。

所以要解决这个问题,有两种工作项 - (1) 实际工作和 (2) 停止处理的命令。然后让 orchestrator 进程首先推送所有工作项,一旦完成,推送 N 个命令停止。您可以选择推送 N+1 个命令来停止:这将保证编排程序进程阻塞,直到最后一个工作人员完成。

将所有这些放在一起,这是一个演示程序:

module Main where

import Prelude

import Data.Array ((..))
import Data.Foldable (for_)
import Data.Int (toNumber)
import Effect (Effect)
import Effect.AVar (AVar)
import Effect.Aff (Aff, Milliseconds(..), delay, forkAff, launchAff_)
import Effect.Aff.AVar as AVar
import Effect.Class (liftEffect)
import Effect.Console (log)

data Work a = Work a | Done

process :: Int -> AVar (Work Int) -> Aff Unit
process myIndex v = do
  w <- AVar.take v
  case w of
    Done ->
      pure unit
    Work i -> do
      liftEffect $ log $ "Worker " <> show myIndex <> ": Processing " <> show i
      delay $ Milliseconds $ toNumber i
      liftEffect $ log $ "Worker " <> show myIndex <> ": Processed " <> show i
      process myIndex v

main :: Effect Unit
main = launchAff_ do
  var <- AVar.empty
  for_ (1..5) \idx -> forkAff $ process idx var

  let inputs = [100,200,300,300,400,1000,2000,101,102,103,104]
  for_ inputs \i -> AVar.put (Work i) var

  for_ (1..6) \_ -> AVar.put Done var

在这个程序中,我的工作项只是数字,表示睡眠的毫秒数。我将其用作每个工作项处理“昂贵”程度的模型。程序输出将是这样的:

Worker 1: Processing 100
Worker 2: Processing 200
Worker 3: Processing 300
Worker 4: Processing 300
Worker 5: Processing 400
Worker 1: Processed 100
Worker 1: Processing 1000
Worker 2: Processed 200
Worker 2: Processing 2000
Worker 3: Processed 300
Worker 3: Processing 101
Worker 4: Processed 300
Worker 4: Processing 102
Worker 5: Processed 400
Worker 5: Processing 103
Worker 3: Processed 101
Worker 3: Processing 104
Worker 4: Processed 102
Worker 5: Processed 103
Worker 3: Processed 104
Worker 1: Processed 1000
Worker 2: Processed 2000