将任务分配给并行工作人员,使预期成本大致相等

Allocating tasks to parallel workers so that expected cost is roughly equal

我有一个分配问题,我试图将一些具有已知预期成本(以秒为单位的运行时间)的任务分配给 X 个并行工作人员,但要遵守每个工作人员接收相同数量任务的约束(保留余数),因此每个工作人员的总预期运行时间大致相等。

我正在使用定义要执行的任务的数据框,并且对于每个任务我都可以计算出相当准确的预期成本(以秒为单位的运行时间)。例如。像这样:

library("tibble")

set.seed(1232)
tasks <- tibble(task = 1:20, cost = runif(20, min = 1, max = 5)^2)
head(tasks)
#> # A tibble: 6 x 2
#>    task  cost
#>   <int> <dbl>
#> 1     1 22.5 
#> 2     2 20.0 
#> 3     3 21.3 
#> 4     4  8.13
#> 5     5 18.3 
#> 6     6 19.6

reprex package (v0.3.0)

于 2019-11-21 创建

然后与 foreach::foreach(...) %dopar% ... 一起使用以并行执行任务。 foreach() 将任务分成大致相等大小的组,大小为 nrow(tasks)/X,其中 X 是并行工作者(核心)的数量。

我目前正在调整任务列表,以便每个工人的成本大致相等,但仍然会有很大的偏差,即一些工人比其他工人早得多完成,因此如果他们有一些更昂贵的任务。例如:

# shuffle tasks (in the original application cost is not random initially)
tasks <- tasks[sample(1:nrow(tasks)), ]

# number of workers
X <- 4
tasks$worker <- rep(1:X, each = nrow(tasks)/X)

# expected total cost (runtime in s) per worker
sapply(split(tasks$cost, tasks$worker), sum)
#>        1        2        3        4 
#> 77.25278 35.25026 66.09959 64.05435

reprex package (v0.3.0)

于 2019-11-21 创建

第二个工人完成的时间是其他工人的一半,所以它的能力被浪费了,整个事情需要更长的时间才能完成。

我想做的是 re-ordering 任务数据框,这样当 foreach 将其分成 X 组时,每组的总预期成本会更均匀。

我想这是一个 super-well 已知的问题,我只是不知道 google 的正确措辞(也不知道如何在 R 中做到这一点)。谢谢你的帮助。

(编辑)大多数更好的选择

目前,一个相对简单的替代方案似乎比随机洗牌效果更好。这按成本对任务进行排序,将前 X 个任务分配给工人 1 到 X,然后将下一个 X 任务块以相反的顺序分配给工人 X 到 1,依此类推(这是下面的 "alt1")。

(EDIT2) 添加了 RcppAlgos 方法

下面是约瑟夫·伍德。

library("tibble")
library("dplyr")
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
library("ggplot2")
library("tidyr")
library("RcppAlgos")

getPartitions <- function(df, nWorkers, tol_ratio = 0.0001) {

  nSections <- nrow(df) / nWorkers
  avg <- sum(df$cost) / nWorkers
  tol <- avg * tol_ratio
  vec <- df$cost
  cond <- TRUE
  part <- list()

  for (i in 1:(nWorkers - 1)) {
      while (cond) {
          vals <- comboGeneral(vec, nSections,
                               constraintFun = "sum",
                               comparisonFun = "==",
                               limitConstraints = avg + (tol / 2),
                               tolerance = tol,
                               upper = 1)

          cond <- nrow(vals) == 0

          if (cond) {
              tol <- tol * 2
          } else {
              v <- match(as.vector(vals), df$cost)
          }
      }

      part[[i]] <- v
      vec <- df$cost[-(do.call(c, part))]
      avg <- sum(vec) / (nWorkers - i)
      tol <- avg * tol_ratio
      cond <- TRUE
  }

  part[[nWorkers]] <- which(!1:nrow(df) %in% do.call(c, part))
  part
}

race <- function() {
  N_TASKS = 100
  X = 4
  tasks <- tibble(task = 1:N_TASKS, cost = runif(N_TASKS, min = 1, max = 10)^2)

  # random shuffle
  tasks$worker <- rep(1:X, each = nrow(tasks)/X)
  rando <- max(sapply(split(tasks$cost, tasks$worker), sum))

  # alternative 1
  tasks <- tasks[order(tasks$cost), ]
  tasks$worker <- rep(c(1:X, X:1), length.out = nrow(tasks))
  alt1 <- max(sapply(split(tasks$cost, tasks$worker), sum))

  # modified version of ivan100sic's answer
  # sort by descending cost, after initial allocation, allocate costly tasks
  # first to workers with lowest total cost so far
  group <- factor(rep(1:(ceiling(nrow(tasks)/4)), each = X))
  tasks <- tasks[order(tasks$cost, decreasing = TRUE), ]
  tasks$worker <- c(1:X, rep(NA, length.out = nrow(tasks) - X))
  task_sets <- split(tasks, group)
  task_sets[[1]]$worker <- 1:X
  for (i in 2:length(task_sets)) {
    # get current total cost by worker
    total <- task_sets %>% 
      bind_rows() %>%
      filter(!is.na(worker)) %>%
      group_by(worker) %>%
      summarize(cost = sum(cost)) %>%
      arrange(cost)
    task_sets[[i]]$worker <- total[["worker"]]
  }
  tasks <- bind_rows(task_sets)
  alt2  <- max(sapply(split(tasks$cost, tasks$worker), sum))

  # RcppAlogs by Joseph Wood below
  setParts <- getPartitions(tasks, X)
  worker   <- rep(1:4, each = N_TASKS/X)
  row_num  <- unsplit(setParts, worker)
  tasks$worker <- worker[order(row_num)]
  algo <- max(sapply(split(tasks$cost, tasks$worker), sum))


  c(ref = sum(tasks$cost) / X, rando = rando, alt1 = alt1, alt2 = alt2, algo = algo)
}

set.seed(24332)
sims <- replicate(1e3, race())
sims <- sims %>%
  t() %>%
  as_tibble() %>%
  pivot_longer(rando:algo, names_to = "Method")

ggplot(sims, aes(x = value, color = Method)) + 
  geom_density() +
  scale_x_continuous(limits = c(0, max(sims$value))) +
  labs(x = "Total runtime (s)")


# this shows the estimated runtime relative to average total cost
# per worker (which may be unobtainable)
sims %>%
  group_by(Method) %>%
  summarize(time_relative_to_ref = mean(value - ref)) %>%
  arrange(time_relative_to_ref)
#> # A tibble: 4 x 2
#>   Method time_relative_to_ref
#>   <chr>                 <dbl>
#> 1 algo                 0.0817
#> 2 alt2                 0.307 
#> 3 alt1                 4.97  
#> 4 rando              154.

reprex package (v0.3.0)

于 2020-02-04 创建

正如@JohnColeman 指出的那样,这基本上归结为分区。我们正在尝试平均分配任务,以使成本总和不会发生太大变化。

下面的算法就是这样做的。主要思想是连续找到一组总和接近平均值的任务。一旦我们找到一个,我们将它们删除,然后继续选择。

下面算法的主力是 comboGeneral 来自 RcppAlgos*。此函数允许找到满足约束条件的向量的组合。在这种情况下,我们正在寻找总和接近 sum(tasks$cost) / (number of workers) ~ 60.66425 的 5 个数字。由于我们正在寻找 close 而不是 exact 的数字,我们可以限制我们的约束。也就是说,我们可以寻找总和在给定公差范围内的组合。

library(RcppAlgos)

getPartitions <- function(df, nWorkers, tol_ratio = 0.0001) {

    nSections <- nrow(df) / nWorkers
    avg <- sum(df$cost) / nWorkers
    tol <- avg * tol_ratio
    vec <- df$cost
    cond <- TRUE
    part <- list()

    for (i in 1:(nWorkers - 1)) {
        while (cond) {
            vals <- comboGeneral(vec, nSections,
                                 constraintFun = "sum",
                                 comparisonFun = "==",
                                 limitConstraints = avg + (tol / 2),
                                 tolerance = tol,
                                 upper = 1)

            cond <- nrow(vals) == 0

            if (cond) {
                tol <- tol * 2
            } else {
                v <- match(as.vector(vals), df$cost)
            }
        }

        part[[i]] <- v
        vec <- df$cost[-(do.call(c, part))]
        avg <- sum(vec) / (nWorkers - i)
        tol <- avg * tol_ratio
        cond <- TRUE
    }

    part[[nWorkers]] <- which(!1:nrow(df) %in% do.call(c, part))
    part
}

OP给出的示例输出如下:

getPartitions(tasks, 4)
[[1]]
[1] 11 13  8 14 10

[[2]]
[1] 12  4 20  2 16

[[3]]
[1] 19  9 18  5  6

[[4]]
[1]  1  3  7 15 17

这些是来自 tasks 的要传递给每个工作人员的行。它立即运行并且 returns 工作量相当均匀。以下是每个工人的估计时间:

sapply(getPartitions(tasks, 4), function(x) {
    sum(tasks$cost[x])
})
[1] 60.67292 60.66552 60.80399 60.51455

考虑到理想时间是 mean(tasks$cost) * 5 ~= 60.66425

让我们看看它的表现如何。下面是一个修改后的绘图脚本,它考虑了给定方法的每个结果的变化程度。我们用 sd(标准差)来衡量这一点。它也是returns理想的解决方案供参考。

library("tibble")
library("dplyr")
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
library("ggplot2")
library("tidyr")

race <- function() {
    N_TASKS = 100
    X = 4
    tasks <- tibble(task = 1:N_TASKS, cost = runif(N_TASKS, min = 1, max = 10)^2)
    ideal_soln <- sum(tasks$cost) / X

    # random shuffle
    tasks$worker <- rep(1:X, each = nrow(tasks)/X)
    rando_mx <- max(sapply(split(tasks$cost, tasks$worker), sum))
    rando_sd <- sd(sapply(split(tasks$cost, tasks$worker), sum))

    # alternative 1
    tasks <- tasks[order(tasks$cost), ]
    tasks$worker <- rep(c(1:X, X:1), length.out = nrow(tasks))
    alt1_mx <- max(sapply(split(tasks$cost, tasks$worker), sum))
    alt1_sd <- sd(sapply(split(tasks$cost, tasks$worker), sum))

    # modified version of ivan100sic's answer
    # sort by descending cost, after initial allocation, allocate costly tasks
    # first to workers with lowest total cost so far
    group <- factor(rep(1:(ceiling(nrow(tasks)/4)), each = X))
    tasks <- tasks[order(tasks$cost, decreasing = TRUE), ]
    tasks$worker <- c(1:X, rep(NA, length.out = nrow(tasks) - X))
    task_sets <- split(tasks, group)
    task_sets[[1]]$worker <- 1:X
    for (i in 2:length(task_sets)) {
        # get current total cost by worker
        total <- task_sets %>% 
            bind_rows() %>%
            filter(!is.na(worker)) %>%
            group_by(worker) %>%
            summarize(cost = sum(cost)) %>%
            arrange(cost)
        task_sets[[i]]$worker <- total[["worker"]]
    }
    tasks <- bind_rows(task_sets)
    alt2_mx  <- max(sapply(split(tasks$cost, tasks$worker), sum))
    alt2_sd  <- sd(sapply(split(tasks$cost, tasks$worker), sum))

    ## RcppAlgos solution
    setParts <- getPartitions(tasks, X)
    algos_mx <- max(sapply(setParts, function(x) sum(tasks$cost[x])))
    algos_sd <- sd(sapply(setParts, function(x) sum(tasks$cost[x])))

    c(target_soln = ideal_soln,rando_max = rando_mx, alt1_max = alt1_mx,
      alt2_max = alt2_mx, algos_max = algos_mx, rando_std_dev = rando_sd,
      alt1_std_dev = alt1_sd, alt2_std_dev = alt2_sd, algos_std_dev = algos_sd)
}

set.seed(24332)
system.time(sims <- replicate(1e3, race()))
sims %>%
    t() %>%
    as_tibble() %>%
    pivot_longer(rando_std_dev:algos_std_dev, names_to = "Method") %>%
    ggplot(aes(x = value, color = Method)) + 
    geom_density() +
    scale_x_continuous(limits = c(0, 100)) +
    labs(x = "Standard Deviation (s)")
Warning message:
Removed 719 rows containing non-finite values (stat_density).

很难判断是怎么回事,因为 rando 方法的标准差太大了。如果我们只看 alt1alt2algos 方法,我们有:

sims %>%
    t() %>%
    as_tibble() %>%
    pivot_longer(alt1_std_dev:algos_std_dev, names_to = "Method") %>%
    ggplot(aes(x = value, color = Method)) + 
    geom_density() +
    scale_x_continuous(limits = c(0, 5)) +
    labs(x = "Standard Deviation (s)")
Warning message:
Removed 335 rows containing non-finite values (stat_density)

现在 alt2algos:

sims %>%
    t() %>%
    as_tibble() %>%
    pivot_longer(alt2_std_dev:algos_std_dev, names_to = "Method") %>%
    ggplot(aes(x = value, color = Method)) + 
    geom_density() +
    scale_x_continuous(limits = c(0, 1.7)) +
    labs(x = "Standard Deviation (s)")

如您所见,RcppAlgos 解决方案每次都能提供最平衡的负载。

最后,这是一个演示每个方法与目标解决方案有多接近的插图:

summary(abs(t(sims)[, "algos_max"] - t(sims)[, "target_soln"]))
    Min.  1st Qu.   Median     Mean  3rd Qu.     Max. 
0.003147 0.057913 0.081986 0.081693 0.106312 0.179099 

summary(abs(t(sims)[, "alt2_max"] - t(sims)[, "target_soln"]))
   Min. 1st Qu.  Median    Mean 3rd Qu.    Max. 
0.01175 0.14321 0.23916 0.30730 0.40949 2.03156

summary(abs(t(sims)[, "alt1_max"] - t(sims)[, "target_soln"]))
  Min. 1st Qu.  Median    Mean 3rd Qu.    Max. 
0.4979  2.9815  4.4725  4.9660  6.3220 16.5716 

summary(abs(t(sims)[, "rando_max"] - t(sims)[, "target_soln"]))
 Min. 1st Qu.  Median    Mean 3rd Qu.    Max. 
13.16   98.14  143.64  154.10  200.41  427.81

我们看到 RcppAlgos 解决方案比第二最佳方法(在本例中为 alt2)平均接近目标解决方案 3-4 倍。

更新

在大多数情况下,alt2/alt1 方法执行得相对较好并且非常简单,这是一个巨大的优势。但是,在很多情况下它们会失败。例如,给定 X 工人和 X - 1您知道的任务比其他任务花费的时间要长得多,因为这些方法依赖于排序,可以预见它们会分配太多给 X - 1 工作人员。只需更改函数 race() 中的以下行:

## Original
tasks <- tibble(task = 1:N_TASKS, cost = runif(N_TASKS, min = 1, max = 10)^2)

## Modified
tasks <- tibble(task = 1:N_TASKS, cost = c(runif(X - 1, 15, 25),
                                           runif(N_TASKS - X + 1, min = 1, max = 10))^2)

现在重新运行并观察:

set.seed(24332)
sims <- replicate(1e3, race())
sims <- sims %>%
    t() %>%
    as_tibble() %>%
    pivot_longer(rando:algo, names_to = "Method")

ggplot(sims, aes(x = value, color = Method)) + 
    geom_density() +
    scale_x_continuous(limits = c(0, max(sims$value))) +
    labs(x = "Total runtime with Large Gap (s)")

sims %>%
    group_by(Method) %>%
    summarize(time_relative_to_ref = mean(value - ref)) %>%
    arrange(time_relative_to_ref)
# A tibble: 4 x 2
Method time_relative_to_ref
<chr>                 <dbl>
1 algo                  0.109
2 alt2                150.   
3 alt1                184.   
4 rando               839.

虽然这是一个人为的例子,但它表明,由于 alt1/alt2 解决方案对基础数据进行了假设,因此当遇到更普遍的问题时,它不可避免地会失败。

* 披露:我是 RcppAlgos

的作者

以下试探法可能会给您带来不错的结果:

按成本降序对所有任务进行排序。对于每个任务,将其分配给迄今为止总分配成本最低的工人。