使 Spark 函数可以从 mutate 中的定制函数中访问
Making Spark functions accessible from within a bespoke function in mutate
在通过 sparklyr 使用 Spark RDD 时,我想包装一些常见的转换,以便更方便地将它们传递给 mutate
语法。
例子
例如,在处理具有以下时间戳的数据时:
2000-01-01 00:00:00.0
2000-02-02 00:00:00.0
我可以使用以下语法将它们转换为更有用的 YYYY-MM-dd
格式:
mutate(nice_date= from_unixtime(unix_timestamp(bad_timestamp), 'YYYY-MM-dd'))
挑战
因为我经常这样做,所以我想包装 from_unixtime(unix_timestamp(bad_timestamp), 'YYYY-MM-dd'))
调用并使用语法:
mutate(nice_date = from_unix_to_nice(bad_date))
传统方法建议写一个函数:
from_unix_to_nice<- function(x) {
from_unixtime(unix_timestamp(x), 'YYYY-MM-dd')
}
问题
应用函数时失败:
> Error: org.apache.spark.sql.AnalysisException: undefined function
> from_unix_to_nice; line 2 pos 62 at
> org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$$anonfun.apply(hiveUDFs.scala:69)
> at
> org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$$anonfun.apply(hiveUDFs.scala:69)
> at scala.Option.getOrElse(Option.scala:120)
如何方便地为常见的变异操作开发包装器,以便将它们传递给 sparklyr 管道?
问题是该函数需要未经计算就传递给 mutate()
函数。 rlang
包可以完成这个,下面是一个例子:
library(rlang)
library(sparklyr)
library(nycflights13)
library(dplyr)
sc <- spark_connect(master = "local")
just_time <- flights %>%
select(time_hour) %>%
mutate(time_hour = as.character(time_hour))
head(100)
spark_flights <- copy_to(sc, just_time, "flights")
from_unix_to_nice<- function(x) {
x <- enexpr(x)
expr(from_unixtime(unix_timestamp(!!x), 'YYYY-MM-dd'))
}
from_unix_to_nice(test)
spark_flights %>%
mutate(new_field = !!from_unix_to_nice(time_hour))
from_unix_to_nice()
函数现在传递:from_unixtime(unix_timestamp(test), "YYYY-MM-dd")
到 mutate()
,就好像您输入了完全相同的语法一样。
在通过 sparklyr 使用 Spark RDD 时,我想包装一些常见的转换,以便更方便地将它们传递给 mutate
语法。
例子
例如,在处理具有以下时间戳的数据时:
2000-01-01 00:00:00.0
2000-02-02 00:00:00.0
我可以使用以下语法将它们转换为更有用的 YYYY-MM-dd
格式:
mutate(nice_date= from_unixtime(unix_timestamp(bad_timestamp), 'YYYY-MM-dd'))
挑战
因为我经常这样做,所以我想包装 from_unixtime(unix_timestamp(bad_timestamp), 'YYYY-MM-dd'))
调用并使用语法:
mutate(nice_date = from_unix_to_nice(bad_date))
传统方法建议写一个函数:
from_unix_to_nice<- function(x) {
from_unixtime(unix_timestamp(x), 'YYYY-MM-dd')
}
问题
应用函数时失败:
> Error: org.apache.spark.sql.AnalysisException: undefined function
> from_unix_to_nice; line 2 pos 62 at
> org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$$anonfun.apply(hiveUDFs.scala:69)
> at
> org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$$anonfun.apply(hiveUDFs.scala:69)
> at scala.Option.getOrElse(Option.scala:120)
如何方便地为常见的变异操作开发包装器,以便将它们传递给 sparklyr 管道?
问题是该函数需要未经计算就传递给 mutate()
函数。 rlang
包可以完成这个,下面是一个例子:
library(rlang)
library(sparklyr)
library(nycflights13)
library(dplyr)
sc <- spark_connect(master = "local")
just_time <- flights %>%
select(time_hour) %>%
mutate(time_hour = as.character(time_hour))
head(100)
spark_flights <- copy_to(sc, just_time, "flights")
from_unix_to_nice<- function(x) {
x <- enexpr(x)
expr(from_unixtime(unix_timestamp(!!x), 'YYYY-MM-dd'))
}
from_unix_to_nice(test)
spark_flights %>%
mutate(new_field = !!from_unix_to_nice(time_hour))
from_unix_to_nice()
函数现在传递:from_unixtime(unix_timestamp(test), "YYYY-MM-dd")
到 mutate()
,就好像您输入了完全相同的语法一样。