multidplyr 和 group_by () 和 filter()
multidplyr and group_by () and filter()
我有以下数据框,我的目的是找到所有具有不同 USAGE 但相同 TYPE 的 ID。
ID <- rep(1:4, each=3)
USAGE <- c("private","private","private","private",
"taxi","private","taxi","taxi","taxi","taxi","private","taxi")
TYPE <- c("VW","VW","VW","VW","MER","VW","VW","VW","VW","VW","VW","VW")
df <- data.frame(ID,USAGE,TYPE)
如果我运行
df %>% group_by(ID, TYPE) %>% filter(n_distinct(USAGE)>1)
我得到了预期的结果。但我的原始数据框有超过 200 万行。所以我想在 运行 执行此操作时使用我所有的核心。
我用 multidplyr 试过这段代码:
f1 <- partition(df, ID)
f2 <- f1 %>% group_by(ID, TYPE) %>% filter(n_distinct(USAGE)>1)
f3 <- collect(f2)
但随后出现以下消息:
Warning message: group_indices_.grouped_df ignores extra arguments
之后
f1 <- partition(df, ID)
和
Error in checkForRemoteErrors(lapply(cl, recvResult)) :
4 nodes produced errors; first error: Evaluation error: object 'f1' not found.
在
之后
f2 <- f1%>% group_by(ID, TYPE) %>% filter(f1, n_distinct(USAGE)>1)
将整个操作实施到 multidplyr 中的正确方法是什么?非常感谢。
您应该在对 partition()
的调用中包含所有分组变量。这样每个核心都拥有为给定组执行计算所需的所有数据。
library(tidyverse)
library(multidplyr)
fast <- df %>%
partition(ID, TYPE) %>%
group_by(ID, TYPE) %>%
filter(n_distinct(USAGE) > 1) %>%
collect()
验证
您仍然会收到有关 group_indices 的警告,但结果与原始 dplyr
方法相同。
slow <- df %>%
group_by(ID, TYPE) %>%
filter(n_distinct(USAGE) > 1)
fast == slow
ID USAGE TYPE
#[1,] TRUE TRUE TRUE
#[2,] TRUE TRUE TRUE
#[3,] TRUE TRUE TRUE
基准测试
现在有个大问题:它更快吗?定义 cluster
让我们确保我们使用所有核心。
library(microbenchmark)
library(parallel)
cluster <- create_cluster(cores = detectCores())
fast_func <- function(df) {
df %>%
partition(ID, TYPE, cluster = cluster) %>%
group_by(ID, TYPE) %>%
filter(n_distinct(USAGE) > 1) %>%
collect()
}
slow_func <- function(df) {
slow <- df %>%
group_by(ID, TYPE) %>%
filter(n_distinct(USAGE) > 1)
}
microbenchmark(fast_func(df), slow_func(df))
# Unit: milliseconds
# expr min lq mean median uq max neval cld
# fast_func(df) 41.360358 47.529695 55.806609 50.529851 61.459433 133.53045 100 b
# slow_func(df) 4.717761 6.974897 9.333049 7.796686 8.468594 49.51916 100 a
在这种情况下,使用并行处理实际上更慢。 fast_func
的中位数 运行 需要 56 毫秒而不是 9 毫秒。这是因为与管理跨集群数据流相关的开销。但是你说你的数据有几百万行,那我们试试吧。
# Embiggen the data
df <- df[rep(seq_len(nrow(df)), each=2000000),] %>% tbl_df()
microbenchmark(fast_func(df), slow_func(df))
# Unit: seconds
# expr min lq mean median uq max neval cld
# fast_func(df) 43.067089 43.781144 50.754600 49.440864 55.308355 65.499095 10 b
# slow_func(df) 1.741674 2.550008 3.529607 3.246665 3.983452 7.214484 10 a
庞大的数据集,fast_func
还是比较慢!有时 运行 并行会节省大量时间,但简单的分组过滤器不一定是其中之一。
我有以下数据框,我的目的是找到所有具有不同 USAGE 但相同 TYPE 的 ID。
ID <- rep(1:4, each=3)
USAGE <- c("private","private","private","private",
"taxi","private","taxi","taxi","taxi","taxi","private","taxi")
TYPE <- c("VW","VW","VW","VW","MER","VW","VW","VW","VW","VW","VW","VW")
df <- data.frame(ID,USAGE,TYPE)
如果我运行
df %>% group_by(ID, TYPE) %>% filter(n_distinct(USAGE)>1)
我得到了预期的结果。但我的原始数据框有超过 200 万行。所以我想在 运行 执行此操作时使用我所有的核心。
我用 multidplyr 试过这段代码:
f1 <- partition(df, ID)
f2 <- f1 %>% group_by(ID, TYPE) %>% filter(n_distinct(USAGE)>1)
f3 <- collect(f2)
但随后出现以下消息:
Warning message: group_indices_.grouped_df ignores extra arguments
之后
f1 <- partition(df, ID)
和
Error in checkForRemoteErrors(lapply(cl, recvResult)) :
4 nodes produced errors; first error: Evaluation error: object 'f1' not found.
在
之后f2 <- f1%>% group_by(ID, TYPE) %>% filter(f1, n_distinct(USAGE)>1)
将整个操作实施到 multidplyr 中的正确方法是什么?非常感谢。
您应该在对 partition()
的调用中包含所有分组变量。这样每个核心都拥有为给定组执行计算所需的所有数据。
library(tidyverse)
library(multidplyr)
fast <- df %>%
partition(ID, TYPE) %>%
group_by(ID, TYPE) %>%
filter(n_distinct(USAGE) > 1) %>%
collect()
验证
您仍然会收到有关 group_indices 的警告,但结果与原始 dplyr
方法相同。
slow <- df %>%
group_by(ID, TYPE) %>%
filter(n_distinct(USAGE) > 1)
fast == slow
ID USAGE TYPE
#[1,] TRUE TRUE TRUE
#[2,] TRUE TRUE TRUE
#[3,] TRUE TRUE TRUE
基准测试
现在有个大问题:它更快吗?定义 cluster
让我们确保我们使用所有核心。
library(microbenchmark)
library(parallel)
cluster <- create_cluster(cores = detectCores())
fast_func <- function(df) {
df %>%
partition(ID, TYPE, cluster = cluster) %>%
group_by(ID, TYPE) %>%
filter(n_distinct(USAGE) > 1) %>%
collect()
}
slow_func <- function(df) {
slow <- df %>%
group_by(ID, TYPE) %>%
filter(n_distinct(USAGE) > 1)
}
microbenchmark(fast_func(df), slow_func(df))
# Unit: milliseconds
# expr min lq mean median uq max neval cld
# fast_func(df) 41.360358 47.529695 55.806609 50.529851 61.459433 133.53045 100 b
# slow_func(df) 4.717761 6.974897 9.333049 7.796686 8.468594 49.51916 100 a
在这种情况下,使用并行处理实际上更慢。 fast_func
的中位数 运行 需要 56 毫秒而不是 9 毫秒。这是因为与管理跨集群数据流相关的开销。但是你说你的数据有几百万行,那我们试试吧。
# Embiggen the data
df <- df[rep(seq_len(nrow(df)), each=2000000),] %>% tbl_df()
microbenchmark(fast_func(df), slow_func(df))
# Unit: seconds
# expr min lq mean median uq max neval cld
# fast_func(df) 43.067089 43.781144 50.754600 49.440864 55.308355 65.499095 10 b
# slow_func(df) 1.741674 2.550008 3.529607 3.246665 3.983452 7.214484 10 a
庞大的数据集,fast_func
还是比较慢!有时 运行 并行会节省大量时间,但简单的分组过滤器不一定是其中之一。