如何在带有 SparkR 的 SparkDataFrame 中使用未定义的变量列表作为列名?

How to use an undefined list of variables as column names in a SparkDataFrame with SparkR?

我在SparkR的世界里不断进步,现在却遇到了无法解决的问题

处理 SparkDataFrame 操作时,我可能想要更新某些列或聚合其他列。我学会了如何根据具体情况进行操作,即逐列进行。

举个例子:

library(SparkR)
library(magrittr)

# Creating SDF
nb.row <- 10 
nb.col <- 10 
m <- matrix(runif(n=nb.row*nb.col, min = 0, max = 100), nb.row, nb.col)
sdf <- createDataFrame(data.frame(ID = 1:10, CODE = base::sample(LETTERS[1:2]), V = m))
  1. 如果我想更新列,我可以这样做:
sdf <- withColumn(sdf, "V_1", sdf$V_1 * 1000)
sdf <- withColumn(sdf, "V_2", sdf$V_2 * 1000)
  1. 如果我想聚合列,我可以这样做:
agg1 <- agg(groupBy(sdf, "CODE"), "SV_6" = sum(sdf$V_6), "SV_7" = sum(sdf$V_7))

我的问题是: 当我不知道要处理的列列表时如何处理这些情况? (在 R basic 上很容易,这对我来说在 SparkR 中似乎是无法克服的...)

  1. 回到 更新 的情况。我发现了类似的东西:
list.var.1 <- paste0("V_", 1:5)
for (i in 1:length(list.var.1)) {
  sdf <- withColumn(sdf, list.var.1[i], sdf[[list.var.1[i]]] * 1000)
}

这给了我预期的结果,但它是最简单的脚本吗?没有更轻松或更“官方”的东西了?

  1. 回到聚合的情况。我发现了类似的东西:
# Useful functions
DFjoin <- function(left_df, right_df, key = "key", join_type = "left"){
    left_df <- withColumnRenamed(left_df, key, "left_key")
    right_df <- withColumnRenamed(right_df, key, "right_key")
    result <- join(
        left_df, right_df,
        left_df$left_key == right_df$right_key,
        joinType = join_type)
    result <- withColumnRenamed(result, "left_key", key)
    result$right_key <- NULL
    return(result)
}

sum_spark <- function(res, df, gb, col) {
  Cols <- paste0('S', col)
  tmp <- agg(groupBy(df, gb), alias(sum(df[[col]]), Cols))
  result <- DFjoin(res, tmp, "CODE")
}

# First step to create base SDF called res
res <- SparkR::select(sdf, sdf$CODE) %>% SparkR::distinct()

# Updating res in a for loop with join
for (i in 1:length(list.var.2)){
  res <- sum_spark(res, sdf, "CODE", list.var.2[i])
}

这也给了我预期的结果,但脚本确实看起来很重(根据我的说法,与 R basic 相比)。我错了吗?

我找不到关于此的更多信息。所以一切都有帮助!!

您可以参考 了解如何将 lapply 与其他 SparkR 函数结合使用来获得您想要的东西,而不是使用 for loops

分享一个有用的函数,用于在下面的列列表中使用 SparkR::agg,这将满足您的目的:

#' Apply SparkR aggregate function on list of columns
#'
#' This function acts as a boilerplate for simplifying the code to do
#' aggregation on multiple columns as a list and apply Spark::agg function on
#' that.
#'
#' @param spark_df Spark dataframe (Grouped or ususal) on which some SparkR
#'     aggregate function to be applied
#' @param agg_cols_list List of Spark column object having some aggregate
#'     function
#'
#' @examples \dontrun{
#'   # sdf is a SparkR dataframe having numeric columns "a" & "b"
#'   sdf <- SparkR::createDataFrame(data.frame(a = c(1, 2), b = c(1, 5)))
#'   sparkr_agg_listargs(sdf,
#'     lapply(c("a", "b"), function(x) sum(SparkR::column(x)))
#'   )
#' }
sparkr_agg_listargs <- function(spark_df, agg_cols_list) {
  do.call(SparkR::agg, c(spark_df, agg_cols_list))
}

请有效地使用 SparkR::alias 以获取所需的新列名称。