用于日期操作的 SparkSQL (Spark 1.3) UDF

SparkSQL (Spark 1.3) UDF for operations on Date

我有一个数据框,其中包含两个包含日期信息的字符串列(即“2014-01-01”)。我想对此类列进行操作,例如转换为日期格式,并减去日期。我尝试使用我在互联网上找到的内容来定义 UDF,例如以下内容:

import org.apache.spark.sql.types.DateType
import org.apache.spark.sql.functions._
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat

val d = DateTimeFormat.forPattern("yyyy-mm-dd'T'kk:mm:ssZ")
val dtFunc: (String => Date) = (arg1: String) => DateTime.parse(arg1, d).toDate
val dtFunc2 = udf(dtFunc)
val x = df.withColumn("dt", dtFunc2(col("dt_string")))

但是当我使用它时,出现以下错误:

scala.MatchError: java.util.Date (of class scala.reflect.internal.Types$TypeRef$$anon)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:112)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:107)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
    at org.apache.spark.sql.functions$.udf(functions.scala:402)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:51)
    at $iwC$$iwC$$iwC.<init>(<console>:53)
    at $iwC$$iwC.<init>(<console>:55)
    at $iwC.<init>(<console>:57)
    at <init>(<console>:59)
    at .<init>(<console>:63)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.reallyInterpret(SparkILoop.scala:856)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
    at org.apache.spark.repl.SparkILoop.processLine(SparkILoop.scala:656)
    at org.apache.spark.repl.SparkILoop.innerLoop(SparkILoop.scala:664)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply$mcZ$sp(SparkILoop.scala:996)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:944)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:944)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:166)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

你能帮我解决这个问题吗?谢谢!

SparkSQL 分别使用 java.sql.Timestampjava.sql.Date 表示时间戳和日期。 java.util.Date 在这里不起作用。您可以简单地提取毫秒数并传递给 java.sql.Date 构造函数。

实际上我会考虑使用 HiveContext 和 Hive UDF。例如,您可以使用具有指定模式的 unix_timestamp(使用 Simple Date Format)将字符串转换为 seconds

df.selectExpr("*", 
  """unix_timestamp(dt_string, "yyyy-MM-dd'T'kk:mm:ss")""")timestamp)""")

并使用标准类型转换来获取日期或时间戳。