SparkR gapply - 函数 returns 多行 R 数据帧

SparkR gapply - function returns a multi-row R dataframe

假设我想执行如下操作:

library(SparkR)
...
df = spark.read.parquet(<some_address>)
df.gapply(
    df,
    df$column1,
    function(key, x) {
        return(data.frame(x, newcol1=f1(x), newcol2=f2(x))
    }
)

其中函数的 return 有多行。需要明确的是,文档中的示例(令人遗憾的是,它与 Spark 文档中的许多示例非常简单相呼应)并不能帮助我确定是否会按我的预期进行处理。

我希望这样的结果是,对于在 DataFrame 中创建的 k 个组,每个组有 n_k 个输出行,gapply() 调用的结果将有 sum(1.. k, n_k) 行,其中为键 k 中的每个组的每个 n_k 行复制键值...但是,模式字段向我建议这不是这样的被处理 - 事实上,它表明它要么希望将结果推入一行。

希望这很清楚,尽管是理论上的(很抱歉,我无法分享我的实际代码示例)。有人可以验证或解释这样的功能实际上是如何处理的吗?

the official documentation:

中明确说明了关于输入和输出的确切期望

Apply a function to each group of a SparkDataFrame. The function is to be applied to each group of the SparkDataFrame and should have only two parameters: grouping key and R data.frame corresponding to that key. The groups are chosen from SparkDataFrames column(s). The output of function should be a data.frame.

Schema specifies the row format of the resulting SparkDataFrame. It must represent R function’s output schema on the basis of Spark data types. The column names of the returned data.frame are set by user. Below is the data type mapping between R and Spark.

换句话说,您的函数应该采用 keydata.frame 对应于该键的行和 return data.frame 可以使用 Spark [=35] 表示=] 类型,模式作为 schema 参数提供。行数没有限制。例如,您可以按如下方式应用身份转换:

df <- as.DataFrame(iris)

gapply(df, "Species", function(k, x) x, schema(df))

与聚合相同的方式:

gapply(df, "Species",
  function(k, x) {
    dplyr::summarize(dplyr::group_by(x, Species), max(Sepal_Width))
  },
  structType(
    structField("species", "string"),
    structField("max_s_width", "double"))
)

尽管在实践中您应该更喜欢直接在 DataFrame (groupBy %>% agg) 上聚合。