使 Spark 函数可以从 mutate 中的定制函数中访问

Making Spark functions accessible from within a bespoke function in mutate

在通过 使用 Spark RDD 时,我想包装一些常见的转换,以便更方便地将它们传递给 mutate 语法。

例子

例如,在处理具有以下时间戳的数据时:

2000-01-01 00:00:00.0
2000-02-02 00:00:00.0

我可以使用以下语法将它们转换为更有用的 YYYY-MM-dd 格式:

mutate(nice_date= from_unixtime(unix_timestamp(bad_timestamp), 'YYYY-MM-dd'))

挑战

因为我经常这样做,所以我想包装 from_unixtime(unix_timestamp(bad_timestamp), 'YYYY-MM-dd')) 调用并使用语法:

mutate(nice_date = from_unix_to_nice(bad_date))

传统方法建议写一个函数:

from_unix_to_nice<- function(x) {
    from_unixtime(unix_timestamp(x), 'YYYY-MM-dd')
}

问题

应用函数时失败:

> Error: org.apache.spark.sql.AnalysisException: undefined function
> from_unix_to_nice; line 2 pos 62  at
> org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$$anonfun.apply(hiveUDFs.scala:69)
>   at
> org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$$anonfun.apply(hiveUDFs.scala:69)
>   at scala.Option.getOrElse(Option.scala:120)

如何方便地为常见的变异操作开发包装器,以便将它们传递给 sparklyr 管道?

问题是该函数需要未经计算就传递给 mutate() 函数。 rlang 包可以完成这个,下面是一个例子:

library(rlang)
library(sparklyr) 
library(nycflights13)
library(dplyr)

sc <- spark_connect(master = "local")

just_time <- flights %>%
     select(time_hour) %>%
     mutate(time_hour = as.character(time_hour))
     head(100)

spark_flights <- copy_to(sc, just_time, "flights")


from_unix_to_nice<- function(x) {
  x <- enexpr(x)
  expr(from_unixtime(unix_timestamp(!!x), 'YYYY-MM-dd'))
}

from_unix_to_nice(test)


spark_flights %>%
  mutate(new_field =  !!from_unix_to_nice(time_hour))

from_unix_to_nice() 函数现在传递:from_unixtime(unix_timestamp(test), "YYYY-MM-dd")mutate(),就好像您输入了完全相同的语法一样。