使用 sparklyr 在 R 中调用 spark window 函数
Calling spark window functions in R using sparklyr
我一直在尝试在 sparklyr 中复制以下 pyspark 片段,但没有成功。
from pyspark.sql.window import Window
from pyspark.sql.functions import concat, col, lit, approx_count_distinct, countDistinct
df = spark.sql("select * from mtcars")
dff = df.withColumn("test", concat(col("gear"), lit(" "), col("carb")))
w = Window.partitionBy("cyl").orderBy("cyl")
dff.withColumn("distinct", approx_count_distinct("test").over(w)).show()
我确实设法像这样开始工作的连接位:
tbl(sc, "mtcars")%>%
spark_dataframe() %>%
invoke("withColumn",
"concat",
invoke_static(sc, "org.apache.spark.sql.functions", "expr", "concat(gear, carb)")) %>%
sdf_register()
我似乎不知道如何调用 Window.partitionBy()
和 Window.orderBy()
# Doesn't work
w <- invoke_static(sc, "org.apache.spark.sql.expressions.Window", "partitionBy", "cyl")
一些指点会很有帮助!
这应该让你继续:
w <- orderBy(windowPartitionBy("cyl"), "cyl")
dff <- select(dff, over(approx_count_distinct("test"), w))
您可以直接通过管道传输 sql。
mtcars_spk <- copy_to(sc, mtcars,"mtcars_spk",overwrite = TRUE)
mtcars_spk2 <- mtcars_spk %>%
dplyr::mutate(test = paste0(gear, " ",carb)) %>%
dplyr::mutate(discnt = sql("approx_count_distinct(test) OVER (PARTITION BY cyl)"))
这里值得注意的是,这是一种罕见的情况,sparklyr 支持其他 window 功能。如果您只想要由 cyl 划分的计数或最小(齿轮),您可以轻松地做到这一点。
mtcars_spk <- copy_to(sc, mtcars,"mtcars_spk",overwrite = TRUE)
mtcars_spk <- mtcars_spk %>%
group_by(cyl) %>%
arrange(cyl) %>%
mutate(cnt = count()
,mindis= min(disp)
在相似主题中链接:
我一直在尝试在 sparklyr 中复制以下 pyspark 片段,但没有成功。
from pyspark.sql.window import Window
from pyspark.sql.functions import concat, col, lit, approx_count_distinct, countDistinct
df = spark.sql("select * from mtcars")
dff = df.withColumn("test", concat(col("gear"), lit(" "), col("carb")))
w = Window.partitionBy("cyl").orderBy("cyl")
dff.withColumn("distinct", approx_count_distinct("test").over(w)).show()
我确实设法像这样开始工作的连接位:
tbl(sc, "mtcars")%>%
spark_dataframe() %>%
invoke("withColumn",
"concat",
invoke_static(sc, "org.apache.spark.sql.functions", "expr", "concat(gear, carb)")) %>%
sdf_register()
我似乎不知道如何调用 Window.partitionBy()
和 Window.orderBy()
# Doesn't work
w <- invoke_static(sc, "org.apache.spark.sql.expressions.Window", "partitionBy", "cyl")
一些指点会很有帮助!
这应该让你继续:
w <- orderBy(windowPartitionBy("cyl"), "cyl")
dff <- select(dff, over(approx_count_distinct("test"), w))
您可以直接通过管道传输 sql。
mtcars_spk <- copy_to(sc, mtcars,"mtcars_spk",overwrite = TRUE)
mtcars_spk2 <- mtcars_spk %>%
dplyr::mutate(test = paste0(gear, " ",carb)) %>%
dplyr::mutate(discnt = sql("approx_count_distinct(test) OVER (PARTITION BY cyl)"))
这里值得注意的是,这是一种罕见的情况,sparklyr 支持其他 window 功能。如果您只想要由 cyl 划分的计数或最小(齿轮),您可以轻松地做到这一点。
mtcars_spk <- copy_to(sc, mtcars,"mtcars_spk",overwrite = TRUE)
mtcars_spk <- mtcars_spk %>%
group_by(cyl) %>%
arrange(cyl) %>%
mutate(cnt = count()
,mindis= min(disp)
在相似主题中链接: