使用 Sparklyr 获取 Spark Dataframe 最大值的索引

Get index of max of Spark Dataframe with Sparklyr

我正在尝试从 Spark 数据帧中获取一行中最大值的索引。获得最大值很简单。我执行以下操作:

library(sparklyr)
library(dplyr)

config <- spark_config()
sc <- spark_connect(master = "local", config = config)

df <- replicate(n = 3, sample(x = 0:10,size = 10, rep=TRUE)) %>%
  as.data.frame()


sdf <- sdf_copy_to(sc, df, overwrite = T)

sdf %>% spark_apply(
  function(df) {
    return( pmax(df[1], df[2], df[3]) )})

我尝试使用 ft_vector_assembler 将它们收集到一个向量中,但我不熟悉返回的数据结构。例如,我无法从以下代码中恢复 max

sdf %>% ft_vector_assembler(
  input_cols = c("V1", "V2", "V3"), 
  output_col = "features") %>%
  select(features) %>%
  spark_apply( function(df) pmax(df))

感谢任何帮助。

让我们从您的第一个问题开始:

It's straight forward to get the maximum value.

的确如此,但是 spark_apply 只是行不通。相反,最好使用 greatest 函数:

sdf %>% mutate(max = greatest(V1, V2, V3))

同样的函数可以用于你的第二个问题,但是由于sparklyr限制,你必须直接使用SQL表达式:

expr <- c("V1", "V2", "V3") %>% 
  paste0(
    "CAST(STRUCT(`",
    ., "`, ", seq_along(.),
    ") AS struct<value: double, index: double>)", collapse=", ")  %>% 
  paste0("greatest(", ., ").index AS max_index")

sdf %>% 
  spark_dataframe() %>%
  invoke("selectExpr", list("*", expr)) %>%
  sdf_register()

在 Spark 2.4 中(目前 sparklyr 不支持)它 可能 可能

sdf %>% mutate(max
  max_index = array_max(arrays_zip(array(V1, V2, V3), array(1, 2, 3))).1
)