Sparklyr 中拆分应用组合策略的错误处理
Error handling for split-apply-combine strategy in Sparklyr
我有一个 Spark DataFrame,它有一个名为 "userid" 的 ID 列,我正在使用 sparklyr
进行操作。每个 userid
可以包含从一行数据到数百行数据的任何位置。我正在对每个 userid
组应用一个函数,该函数根据特定的事件标准压缩它包含的行数。像
sdf %>%
group_by(userid) %>%
... %>% # using dplyr::filter and dplyr::mutate
ungroup()
我想把这个函数包装在一个错误处理程序中,例如purrr::possibly
,这样如果单个组发生错误,计算就不会中断。
到目前为止,我使用 replyr 包取得了最大的成功。具体来说,replyr::gapply
"partitions from by values in grouping column, applies a generic transform to each group and then binds the groups back together."对数据进行分区有两种方法:"group_by"和"extract"。作者只建议在组数为 100 或更少的情况下使用 "extract",但 "group_by" 方法并不像我预期的那样有效:
library(sparklyr)
library(dplyr)
library(replyr) # replyr::gapply
library(purrr) # purrr::possibly
sc <- spark_connect(master = "local")
# Create a test data frame to use gapply on.
test_spark <- tibble(
userid = c(1, 1, 2, 2, 3, 3),
occurred_at = seq(1, 6)
) %>%
sdf_copy_to(sc, ., "test_spark")
# Create a data frame that purrr::possibly should return in case of error.
default_spark <- tibble(userid = -1, max = -1, min = -1) %>%
sdf_copy_to(sc, ., "default_spark")
#####################################################
# Method 1: gapply with partitionMethod = "group_by".
#####################################################
# Create a function which may throw an error. The group column, userid, is not
# included since gapply( , partitionMethod = "group_by") creates it.
# - A print statement is included to show that when gapply uses "group_by", the
# function is only called once.
fun_for_groups <- function(sdf) {
temp <- sample(c(1,2), 1)
print(temp)
if (temp == 2) {
log("a")
} else {
sdf %>%
summarise(max = max(occurred_at),
min = min(occurred_at))
}
}
# Wrap the risk function to try and handle the error gracefully.
safe_for_groups <- purrr::possibly(fun_for_groups, otherwise = default_spark)
# Apply the safe function to each userid using gapply and "group_by".
# - The result is either a) only the default_spark data frame.
# b) the result expected if no error occurs in fun_for_groups.
# I would expect the answer to have a mixture of default_spark rows and correct rows.
replyr::gapply(
test_spark,
gcolumn = "userid",
f = safe_for_groups,
partitionMethod = "group_by"
)
#####################################################
# Method 2: gapply with partitionMethod = "extract".
#####################################################
# Create a function which may throw an error. The group column, userid, is
# included since gapply( , partiionMethod = "extract") doesn't create it.
# - Include a print statement to show that when gapply uses partitionMethod
# "split", the function is called for each userid.
fun_for_extract <- function(df) {
temp <- sample(c(1,2), 1)
print(temp)
if (temp == 2) {
log("a")
} else {
df %>%
summarise(max = max(occurred_at),
min = min(occurred_at),
userid = min(userid))
}
}
safe_for_extract <- purrr::possibly(fun_for_extract, otherwise = default_spark)
# Apply that function to each userid using gapply and "split".
# - The result dataframe has a mixture of "otherwise" rows and correct rows.
replyr::gapply(
test_spark,
gcolumn = "userid",
f = safe_for_extract,
partitionMethod = "extract"
)
当分组列有数百万个值时使用 gapply
是多么糟糕的想法?是否有上述错误处理策略的替代方案?
replyr::gapply()
只是 dplyr
之上的薄包装(在本例中为 sparklyr
)。
对于分组模式,只有在没有分组错误的情况下,结果才是正确的,因为计算是一次性发出的。这是最有效的模式,但不能真正实现任何类型的错误处理。
对于提取模式- 可以添加错误处理,但当前代码没有。
作为 replyr
的作者,我实际上建议研究 sparklyr
的 spark_apply()
方法。 replyr
的 gapply 是在 spark_apply()
在 sparklyr
中不可用时设计的(并且当数据绑定列表在 sparklyr
中也不可用时)。
还有 replyr
is mostly in "maintenance mode"(为在大型项目中使用它的客户修补问题),可能不是新项目的好选择。
我有一个 Spark DataFrame,它有一个名为 "userid" 的 ID 列,我正在使用 sparklyr
进行操作。每个 userid
可以包含从一行数据到数百行数据的任何位置。我正在对每个 userid
组应用一个函数,该函数根据特定的事件标准压缩它包含的行数。像
sdf %>%
group_by(userid) %>%
... %>% # using dplyr::filter and dplyr::mutate
ungroup()
我想把这个函数包装在一个错误处理程序中,例如purrr::possibly
,这样如果单个组发生错误,计算就不会中断。
到目前为止,我使用 replyr 包取得了最大的成功。具体来说,replyr::gapply
"partitions from by values in grouping column, applies a generic transform to each group and then binds the groups back together."对数据进行分区有两种方法:"group_by"和"extract"。作者只建议在组数为 100 或更少的情况下使用 "extract",但 "group_by" 方法并不像我预期的那样有效:
library(sparklyr)
library(dplyr)
library(replyr) # replyr::gapply
library(purrr) # purrr::possibly
sc <- spark_connect(master = "local")
# Create a test data frame to use gapply on.
test_spark <- tibble(
userid = c(1, 1, 2, 2, 3, 3),
occurred_at = seq(1, 6)
) %>%
sdf_copy_to(sc, ., "test_spark")
# Create a data frame that purrr::possibly should return in case of error.
default_spark <- tibble(userid = -1, max = -1, min = -1) %>%
sdf_copy_to(sc, ., "default_spark")
#####################################################
# Method 1: gapply with partitionMethod = "group_by".
#####################################################
# Create a function which may throw an error. The group column, userid, is not
# included since gapply( , partitionMethod = "group_by") creates it.
# - A print statement is included to show that when gapply uses "group_by", the
# function is only called once.
fun_for_groups <- function(sdf) {
temp <- sample(c(1,2), 1)
print(temp)
if (temp == 2) {
log("a")
} else {
sdf %>%
summarise(max = max(occurred_at),
min = min(occurred_at))
}
}
# Wrap the risk function to try and handle the error gracefully.
safe_for_groups <- purrr::possibly(fun_for_groups, otherwise = default_spark)
# Apply the safe function to each userid using gapply and "group_by".
# - The result is either a) only the default_spark data frame.
# b) the result expected if no error occurs in fun_for_groups.
# I would expect the answer to have a mixture of default_spark rows and correct rows.
replyr::gapply(
test_spark,
gcolumn = "userid",
f = safe_for_groups,
partitionMethod = "group_by"
)
#####################################################
# Method 2: gapply with partitionMethod = "extract".
#####################################################
# Create a function which may throw an error. The group column, userid, is
# included since gapply( , partiionMethod = "extract") doesn't create it.
# - Include a print statement to show that when gapply uses partitionMethod
# "split", the function is called for each userid.
fun_for_extract <- function(df) {
temp <- sample(c(1,2), 1)
print(temp)
if (temp == 2) {
log("a")
} else {
df %>%
summarise(max = max(occurred_at),
min = min(occurred_at),
userid = min(userid))
}
}
safe_for_extract <- purrr::possibly(fun_for_extract, otherwise = default_spark)
# Apply that function to each userid using gapply and "split".
# - The result dataframe has a mixture of "otherwise" rows and correct rows.
replyr::gapply(
test_spark,
gcolumn = "userid",
f = safe_for_extract,
partitionMethod = "extract"
)
当分组列有数百万个值时使用 gapply
是多么糟糕的想法?是否有上述错误处理策略的替代方案?
replyr::gapply()
只是 dplyr
之上的薄包装(在本例中为 sparklyr
)。
对于分组模式,只有在没有分组错误的情况下,结果才是正确的,因为计算是一次性发出的。这是最有效的模式,但不能真正实现任何类型的错误处理。
对于提取模式- 可以添加错误处理,但当前代码没有。
作为 replyr
的作者,我实际上建议研究 sparklyr
的 spark_apply()
方法。 replyr
的 gapply 是在 spark_apply()
在 sparklyr
中不可用时设计的(并且当数据绑定列表在 sparklyr
中也不可用时)。
还有 replyr
is mostly in "maintenance mode"(为在大型项目中使用它的客户修补问题),可能不是新项目的好选择。