使用 sparklyr 在 R 中调用 spark window 函数

Calling spark window functions in R using sparklyr

我一直在尝试在 sparklyr 中复制以下 pyspark 片段,但没有成功。

from pyspark.sql.window import Window
from pyspark.sql.functions import concat, col, lit, approx_count_distinct, countDistinct

df = spark.sql("select * from mtcars")

dff = df.withColumn("test", concat(col("gear"), lit(" "), col("carb")))
w = Window.partitionBy("cyl").orderBy("cyl")
  
dff.withColumn("distinct", approx_count_distinct("test").over(w)).show()

我确实设法像这样开始工作的连接位:

tbl(sc, "mtcars")%>% 
  spark_dataframe() %>% 
  invoke("withColumn", 
         "concat", 
         invoke_static(sc, "org.apache.spark.sql.functions", "expr", "concat(gear, carb)")) %>% 
  sdf_register()

我似乎不知道如何调用 Window.partitionBy()Window.orderBy()

# Doesn't work
w <- invoke_static(sc, "org.apache.spark.sql.expressions.Window", "partitionBy", "cyl")

一些指点会很有帮助!

这应该让你继续:

w <- orderBy(windowPartitionBy("cyl"), "cyl")
dff <- select(dff, over(approx_count_distinct("test"), w))

您可以直接通过管道传输 sql。

mtcars_spk <- copy_to(sc, mtcars,"mtcars_spk",overwrite = TRUE)
mtcars_spk2 <- mtcars_spk %>%
                dplyr::mutate(test = paste0(gear, " ",carb)) %>%
                dplyr::mutate(discnt = sql("approx_count_distinct(test) OVER (PARTITION BY cyl)"))

这里值得注意的是,这是一种罕见的情况,sparklyr 支持其他 window 功能。如果您只想要由 cyl 划分的计数或最小(齿轮),您可以轻松地做到这一点。

mtcars_spk <- copy_to(sc, mtcars,"mtcars_spk",overwrite = TRUE)
mtcars_spk <- mtcars_spk %>%
                group_by(cyl) %>%
                arrange(cyl) %>%
                mutate(cnt = count()
                       ,mindis= min(disp)

在相似主题中链接: