R 中的 furrr 包不会将作业分散到所有内核?

furrr package in R doesn't keep spreading the jobs across all cores?

我的任务是计算余弦差异。

给定一个用户观察的数据框,我在每对行之间执行余弦差异。

长话短说,我正在使用 furrr::future_map2_dfr 函数将计算分散到我拥有的所有内核上。

出于某种原因,当一些内核空闲而其他内核正在努力工作时,它们的工作不会继续分散到其他内核。

例如:

这里是起点:

现在正在计算中:

为什么核心 1、2、5、6、8、11、12、15 不参与并共享剩余的作业?

与其他计算相同。

我是否错过了任何可以改变当前行为的 furrr 设置?

P.S

现在有 5 个核心工作 "hard",出于某种原因,furrr 没有将他们的工作分散到所有 16 个核心以使其更快。

函数:

dissimilarity_wrapper <- function(n_users, 
                                  train_data, 
                                  train_data_std, 
                                  test_data, 
                                  std_thresh = 0.5) {

  # NOTE:
  # n_users must be set to maximum users in order to make this function
  # work properly.

  # Generating the options:

  user_combinations <- expand.grid(i = seq_len(n_users),
                                   j = seq_len(n_users))

  plan(strategy = multicore)

  expand_grid_options <- furrr::future_map2_dfr(.x = user_combinations$i,
                                                .y = user_combinations$j,
                                                function(x, y) { 
                                                  expand.grid(test_idx = which(test_data$user_id == x),
                                                              train_idx = which(train_data$user_id == y))})

  drop <- c("user_id", "row_num", 
            "obs_id", "scroll_id", 
            "time_stamp", "seq_label", 
            "scroll_length")

  test <- test_data[expand_grid_options$test_idx, !names(test_data) %in% drop]

  train <- train_data[expand_grid_options$train_idx, !names(train_data) %in% drop]

  train_std <- train_data_std[expand_grid_options$train_idx, ]

  # Calculate different D's:

  D_manhattan_scaled <- (abs(test - train) / train_std) %>% rowSums()

  D_cosinus <- 1 - (rowSums(test * train) / (sqrt(rowSums(test^2) * rowSums(train^2))))

  train_std[train_std < std_thresh] <- 1

  D_manhattan_scaled_adj_std <- (abs(test - train) / train_std) %>% rowSums()

  D_manhattan <- (abs(test - train)) %>% rowSums()

  return(expand_grid_options %>%
           dplyr::mutate(
             D_manhattan_scaled = D_manhattan_scaled,
             D_cosinus = D_cosinus,
             D_manhattan_scaled_adj_std = D_manhattan_scaled_adj_std,
             D_manhattan = D_manhattan,
             isSame = test_data[test_idx, ]$user_id == train_data[train_idx, ]$user_id))

}


train_test_std_split <- function(data, 
                                 train_size, 
                                 test_size, 
                                 feature_selection) {

  train_set <- data %>% 
    dplyr::ungroup() %>%
    dplyr::arrange(time_stamp) %>%
    dplyr::group_by(user_id) %>%
    dplyr::filter(row_number() <= train_size) %>%
    dplyr::ungroup()

  if (length(feature_selection) > 1) {

    # Manual:
    # scaling_param_est <- scale_param_est_total_UG

    scaling_param_est <- train_set %>%
      dplyr::group_by(user_id) %>%
      dplyr::summarize_at(vars(feature_selection), funs(mean, sd))

  } else if (length(feature_selection) == 1) {

    scaling_param_est <- train_set %>%
      dplyr::group_by(user_id) %>%
      dplyr::summarize_at(vars(feature_selection), funs(mean, sd)) %>%
      dplyr::rename_at(vars("mean", "sd"), 
                       funs(paste(feature_selection, ., sep = "_")))

  }

  train_set <- train_set %>%
    dplyr::group_by(user_id) %>%
    dplyr::mutate_at(vars(feature_selection), scale) %>%
    data.table::as.data.table() %>%
    dplyr::ungroup() %>% 
    dplyr::as_tibble() %>%
    dplyr::arrange(time_stamp) 

  train_set_std <- train_set %>%
    dplyr::left_join(train_set %>%
                dplyr::group_by(user_id) %>%
                dplyr::summarize_at(feature_selection, sd) %>%
                dplyr::rename_at(vars(-"user_id"), 
                                 funs(paste0(feature_selection, "_sd"))), by = "user_id") %>%
    dplyr::ungroup() %>%
    dplyr::arrange(time_stamp) %>% 
    dplyr::select(matches("_sd"))

  test_set_unscaled <- data %>%
    dplyr::ungroup() %>%
    dplyr::arrange(time_stamp) %>%
    dplyr::filter(!(obs_id %in% train_set$obs_id)) %>%
    dplyr::group_by(user_id) %>%
    dplyr::filter(row_number() <= test_size) %>%
    dplyr::ungroup()

  # Manual:
  # test_set_joined_with_scaling_params <- cbind(test_set_unscaled, scaling_param_est)
  test_set_unscaled_joined_with_scaling_params <- test_set_unscaled %>%
    dplyr::left_join(scaling_param_est, by = "user_id")

  test_set_unscaled_joined_with_scaling_params[, feature_selection] <-
    (test_set_unscaled_joined_with_scaling_params[, feature_selection] - 
       test_set_unscaled_joined_with_scaling_params[, paste0(feature_selection, "_mean")]) /
    test_set_unscaled_joined_with_scaling_params[, paste0(feature_selection, "_sd")]

  test_set <- test_set_unscaled_joined_with_scaling_params %>%
    dplyr::select(user_id, obs_id, scroll_id, 
                  time_stamp, row_num, scroll_length, 
                  feature_selection)


  # Validate:

  # intersect(unique(test_set$obs_id), unique(train_set$obs_id))

  # compute_std <- train_set %>%
  #   dplyr::group_by(user_id) %>%
  #   dplyr::select(-row_num) %>%
  #   dplyr::rename_at(vars(-user_id, -obs_id, -scroll_id, 
  #                         -time_stamp, -scroll_length), 
  #                    funs(paste(., "std", sep = "_"))) %>%
  #   dplyr::summarize_at(vars(matches("_std$")), funs(sd)) %>% 
  #   dplyr::ungroup()

  return(list("train_set" = train_set,
              "train_set_std" = train_set_std,
              "test_set" = test_set,
              "test_set_unscaled" = test_set_unscaled))

}

build_dissimilarity_rank <- function(n_users, 
                                     train_set, 
                                     train_set_std, 
                                     test_set, 
                                     D_type = "D_cosinus") {

  return(dissimilarity_wrapper(n_users, train_set, train_set_std, test_set) %>% 
           dplyr::mutate(train_user_id = train_set[train_idx, ]$user_id,
                         test_user_id = test_set[test_idx, ]$user_id) %>%
           dplyr::select(test_idx, 
                         train_user_id, 
                         test_user_id, 
                         train_idx, 
                         D_manhattan_scaled,
                         D_cosinus,
                         D_manhattan_scaled_adj_std,
                         D_manhattan,
                         isSame) %>%
           dplyr::group_by(test_idx, train_user_id) %>%
           dplyr::arrange(train_user_id, !!rlang::sym(D_type)) %>%
           dplyr::mutate(D_manhattan_rank = rank(D_manhattan),
                         D_manhattan_scaled_rank = rank(D_manhattan_scaled, ties.method = "first"),
                         D_cosinus_rank = rank(D_cosinus, ties.method = "first")) %>%
           dplyr::ungroup())

}

build_param_est <- function(dissimilarity_rank,
                            K,
                            D_type_rank = "D_manhattan_scaled") {

  return(dissimilarity_rank %>%
           dplyr::filter(isSame, (!!rlang::sym(paste0(D_type_rank, "_rank"))) == K) %>%
           dplyr::group_by(train_user_id) %>%
           dplyr::summarise_at(vars(D_manhattan_scaled,
                                    D_cosinus,
                                    D_manhattan_scaled_adj_std,
                                    D_manhattan),
                               funs(mean, median, sd, quantile(., probs = .9))) %>%
           dplyr::rename_at(vars(matches("_quantile")),
                            funs(str_replace(., "_quantile", "_percentile_90"))) %>%
           dplyr::rename_at(vars(matches("_sd")),
                            funs(str_replace(., "_sd", "_std")))
  )
}

build_dissimilarity_table <- function(dissimilarity_rank,
                                      param_est,
                                      K,
                                      i,
                                      D_type_rank = "D_manhattan_scaled",
                                      D_s = c("D_manhattan_scaled",
                                              "D_cosinus",
                                              "D_manhattan_scaled_adj_std",
                                              "D_manhattan")) {

  dissimilarity_table <- dissimilarity_rank %>%
    dplyr::filter(isSame, (!!rlang::sym(paste0(D_type_rank, "_rank"))) == K) %>%
    dplyr::left_join(param_est, by = c("train_user_id")) %>%
    dplyr::ungroup()

  dissimilarity_table[paste0(D_s, "_norm_standard")] <-
    (dissimilarity_table[D_s] - dissimilarity_table[paste0(D_s, "_mean")]) /
    dissimilarity_table[paste0(D_s, "_std")]

  dissimilarity_table[paste0(D_s, "_norm_median")] <-
    (dissimilarity_table[D_s] - dissimilarity_table[paste0(D_s, "_median")]) /
    (dissimilarity_table[paste0(D_s, "_percentile_90")] - dissimilarity_table[paste0(D_s, "_median")])

  # dplyr::mutate(experiment = i))

  return(dissimilarity_table)

}

k_fold_data_prepare <- function(df, min_scroll_len = 3) {

  # Given the data, split it by user id:

  return(df %>%
           dplyr::filter(scroll_length >= min_scroll_len) %>%
           dplyr::arrange(time_stamp) %>%
           dplyr::ungroup() %>%
           split(.$user_id))

}

k_fold_engine <- function(df, 
                          obs, 
                          n_users, 
                          K = 2, 
                          feature_selection,
                          D_type = "D_cosinus") {

  # Train - Test Split:

  train_set <- df %>%
    dplyr::arrange(time_stamp) %>%
    dplyr::filter(obs_id != obs)

  if (length(feature_selection) > 1) {

   # Manual:  
   # scaling_param_est <- scale_param_est_total_UG
    scaling_param_est <- train_set %>%
      dplyr::arrange(time_stamp) %>%
      dplyr::group_by(user_id) %>%
      dplyr::summarize_at(vars(feature_selection), 
                          funs(mean, sd))

  } else if (length(feature_selection) == 1) {

    scaling_param_est <- train_set %>%
      dplyr::arrange(time_stamp) %>%
      dplyr::group_by(user_id) %>%
      dplyr::summarize_at(vars(feature_selection), funs(mean, sd)) %>%
      dplyr::rename_at(vars("mean", "sd"), 
                       funs(paste(feature_selection, ., sep = "_")))

  }

  train_set <- train_set %>%
    dplyr::arrange(time_stamp) %>%
    dplyr::group_by(user_id) %>%
    dplyr::mutate_at(vars(feature_selection), scale) %>%
    as.data.table() %>%
    dplyr::ungroup() %>% 
    as_tibble()

  # Compute std for each train variable:

  train_set_std <- train_set %>%
    dplyr::left_join(train_set %>%
                       dplyr::group_by(user_id) %>%
                       dplyr::summarize_at(feature_selection, sd) %>%
                       dplyr::rename_at(vars(-"user_id"), 
                                        funs(paste0(feature_selection, "_sd"))), by = "user_id") %>%
    dplyr::select(matches("_sd"))

  test_set <- df %>%
    dplyr::filter(obs_id == obs)

  test_set_joined_with_scaling_params <- test_set %>%
    dplyr::left_join(scaling_param_est, by = "user_id") %>%
    dplyr::arrange(time_stamp)

  # Manual:
  # test_set_joined_with_scaling_params <- cbind(test_set, scaling_param_est)

  test_set_joined_with_scaling_params[, feature_selection] <-
    (test_set_joined_with_scaling_params[, feature_selection] - 
       test_set_joined_with_scaling_params[, paste0(feature_selection, "_mean")]) /
    test_set_joined_with_scaling_params[, paste0(feature_selection, "_sd")]

  test_set <- test_set_joined_with_scaling_params %>%
    dplyr::arrange(time_stamp) %>%
    dplyr::select(user_id, obs_id, scroll_id, 
                  time_stamp, row_num, scroll_length, 
                  feature_selection)

  # Compute std for each train variable:
  # compute_std <- train_set %>%
  #   dplyr::group_by(user_id) %>%
  #   dplyr::select(-row_num) %>%
  #   dplyr::rename_at(vars(-user_id, -obs_id, -scroll_id, 
  #                         -time_stamp, -scroll_length), 
  #                    funs(paste(., "std", sep = "_"))) %>%
  #   dplyr::summarize_at(vars(matches("_std$")), funs(sd)) %>% 
  #   dplyr::ungroup()
  # 
  # train_set_std <- dplyr::left_join(train_set, 
  #                            compute_std, 
  #                            by = "user_id") %>%
  #   dplyr::ungroup() %>%
  #   dplyr::select(matches("_std$"))

  # Compute the dissimilarities:

  return(build_dissimilarity_rank(n_users, 
                                  train_set, 
                                  train_set_std, 
                                  test_set,
                                  D_type))

}

k_fold_wrapper <- function(data_df, 
                           K = 2, 
                           D_type_rank = "D_cosinus",
                           feature_selection) {

  data_seqed <- k_fold_data_prepare(data_df)

  # Given the data splitted by user id, split it by observation id:

  data_seqed_by_obs <- future_imap(data_seqed, ~split(., .$obs_id ))

  # Get the observation ids per each splitted sub dataframe:

  obs_ids <- future_imap(data_seqed_by_obs, ~as.integer(names(.)))

  # Feed kfold engine with splitted data by user id and observations names:

  plan(strategy = multicore)

  dissimilarity_rank <- furrr::future_map_dfr(data_seqed, function(x) {

    furrr::future_map_dfr(obs_ids[[as.character(x$user_id[1])]], 

                          function(df, 
                                   obs, 
                                   n_users, 
                                   K, 
                                   feature_selection,
                                   D_type_rank) { 

                            k_fold_engine(df, 
                                          obs, 
                                          n_users, 
                                          K, 
                                          feature_selection,
                                          D_type_rank) }, 

                          df = x, n_users = x$user_id[1], 

                          K = K, feature_selection = feature_selection,

                          D_type = D_type_rank) }  ) 


  if(nrow(dissimilarity_rank[which(rowSums(is.na(dissimilarity_rank)) > 0), ])) {

    dissimilarity_rank <- dissimilarity_rank[which(rowSums(is.na(dissimilarity_rank)) == 0), ] %>%
      dplyr::mutate(row_num = row_number())

  }

  param_estimations <- dissimilarity_rank %>% 
    build_param_est(K, D_type_rank = D_type_rank)

  # Summarize and return final param estimation (average):

  # return(param_estimations %>% 
  #          dplyr::group_by(train_user_id) %>%
  #          summarize_at(vars(-"train_user_id"), mean))

  return(list("dissimilarity_rank" = dissimilarity_rank, 
              "param_estimations" = param_estimations))

}

导致问题的最终脚本:

n_users <- max(unique(data$user_id))

train_df <- data %>%
    dplyr::group_by(user_id) %>%
    dplyr::filter(row_number() <= 50)

filter_users_low_amount_obs <- train_df %>%
  dplyr::group_by(user_id) %>%
  dplyr::summarise(n_obs = length(unique(obs_id))) %>% 
  dplyr::arrange(n_obs) %>%
  dplyr::filter(n_obs >= 3) %>%
  select(user_id)

train_df <- train_df %>% 
  filter(user_id %in% filter_users_low_amount_obs$user_id)

k_fold_d_rank_param_est <- k_fold_wrapper(train_df, K, D_type_rank = D_type, feature_selection)

dissimilarity_rank_1 <- k_fold_d_rank_param_est$dissimilarity_rank

param_est <- k_fold_d_rank_param_est$param_estimations

train_test_std_split_2 <- train_test_std_split(data, 
                                               train_size_2, 
                                               test_size = Inf,
                                               feature_selection)

dissimilarity_rank_2 <- build_dissimilarity_rank(n_users, 
                                                 train_test_std_split_2$train_set, 
                                                 train_test_std_split_2$train_set_std, 
                                                 train_test_std_split_2$test_set)

我认为您缺少的选项是 furrr 的调度选项。默认情况下,您的数据被分成与您在 future_map 调用开始时指定的工作人员一样多的块,然后每个工作人员被分配一个块来处理。一旦工作人员完成了它的块,它将寻找另一个块并开始处理它。如果没有剩余的块,worker 将空闲。

您可以使用调度选项指定每个工作人员应将数据分成多少块。例如 .options = furrr_options(scheduling = 2) 将为每个工作人员创建两个块,提前完成的工作人员将开始处理另一个块。

有关更多信息,请参阅有关分块的小插图 https://davisvaughan.github.io/furrr/articles/articles/chunking.html

PS:您的代码中有一些嵌套的未来调用,具体取决于您指定的 future::plan() 这只会减慢代码