用于日期操作的 SparkSQL (Spark 1.3) UDF
SparkSQL (Spark 1.3) UDF for operations on Date
我有一个数据框,其中包含两个包含日期信息的字符串列(即“2014-01-01”)。我想对此类列进行操作,例如转换为日期格式,并减去日期。我尝试使用我在互联网上找到的内容来定义 UDF,例如以下内容:
import org.apache.spark.sql.types.DateType
import org.apache.spark.sql.functions._
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
val d = DateTimeFormat.forPattern("yyyy-mm-dd'T'kk:mm:ssZ")
val dtFunc: (String => Date) = (arg1: String) => DateTime.parse(arg1, d).toDate
val dtFunc2 = udf(dtFunc)
val x = df.withColumn("dt", dtFunc2(col("dt_string")))
但是当我使用它时,出现以下错误:
scala.MatchError: java.util.Date (of class scala.reflect.internal.Types$TypeRef$$anon)
at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:112)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:107)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
at org.apache.spark.sql.functions$.udf(functions.scala:402)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:51)
at $iwC$$iwC$$iwC.<init>(<console>:53)
at $iwC$$iwC.<init>(<console>:55)
at $iwC.<init>(<console>:57)
at <init>(<console>:59)
at .<init>(<console>:63)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at org.apache.spark.repl.SparkIMain.loadAndRunReq(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret(SparkILoop.scala:856)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
at org.apache.spark.repl.SparkILoop.processLine(SparkILoop.scala:656)
at org.apache.spark.repl.SparkILoop.innerLoop(SparkILoop.scala:664)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply$mcZ$sp(SparkILoop.scala:996)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:944)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
你能帮我解决这个问题吗?谢谢!
SparkSQL 分别使用 java.sql.Timestamp
和 java.sql.Date
表示时间戳和日期。 java.util.Date
在这里不起作用。您可以简单地提取毫秒数并传递给 java.sql.Date
构造函数。
实际上我会考虑使用 HiveContext
和 Hive UDF。例如,您可以使用具有指定模式的 unix_timestamp
(使用 Simple Date Format)将字符串转换为 seconds
df.selectExpr("*",
"""unix_timestamp(dt_string, "yyyy-MM-dd'T'kk:mm:ss")""")timestamp)""")
并使用标准类型转换来获取日期或时间戳。
我有一个数据框,其中包含两个包含日期信息的字符串列(即“2014-01-01”)。我想对此类列进行操作,例如转换为日期格式,并减去日期。我尝试使用我在互联网上找到的内容来定义 UDF,例如以下内容:
import org.apache.spark.sql.types.DateType
import org.apache.spark.sql.functions._
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
val d = DateTimeFormat.forPattern("yyyy-mm-dd'T'kk:mm:ssZ")
val dtFunc: (String => Date) = (arg1: String) => DateTime.parse(arg1, d).toDate
val dtFunc2 = udf(dtFunc)
val x = df.withColumn("dt", dtFunc2(col("dt_string")))
但是当我使用它时,出现以下错误:
scala.MatchError: java.util.Date (of class scala.reflect.internal.Types$TypeRef$$anon)
at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:112)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:107)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
at org.apache.spark.sql.functions$.udf(functions.scala:402)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:51)
at $iwC$$iwC$$iwC.<init>(<console>:53)
at $iwC$$iwC.<init>(<console>:55)
at $iwC.<init>(<console>:57)
at <init>(<console>:59)
at .<init>(<console>:63)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at org.apache.spark.repl.SparkIMain.loadAndRunReq(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret(SparkILoop.scala:856)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
at org.apache.spark.repl.SparkILoop.processLine(SparkILoop.scala:656)
at org.apache.spark.repl.SparkILoop.innerLoop(SparkILoop.scala:664)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply$mcZ$sp(SparkILoop.scala:996)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:944)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
你能帮我解决这个问题吗?谢谢!
SparkSQL 分别使用 java.sql.Timestamp
和 java.sql.Date
表示时间戳和日期。 java.util.Date
在这里不起作用。您可以简单地提取毫秒数并传递给 java.sql.Date
构造函数。
实际上我会考虑使用 HiveContext
和 Hive UDF。例如,您可以使用具有指定模式的 unix_timestamp
(使用 Simple Date Format)将字符串转换为 seconds
df.selectExpr("*",
"""unix_timestamp(dt_string, "yyyy-MM-dd'T'kk:mm:ss")""")timestamp)""")
并使用标准类型转换来获取日期或时间戳。