从 DataBricks / Spark 中的 SELECT 语句调用时,Scala UDF 失败
Scala UDF fails when called from a SELECT statement in DataBricks / Spark
我在 Azure DataBricks(Spark 2.3.1 和 Scala 2.11)中注册为 UDF 的以下 Scala 函数:
import org.joda.time.DateTime
val slot = (dt : DateTime) => {
var _s : Int = (dt.dayOfYear().get() - 1) * 24 + dt.hourOfDay().get()
_s
}
val compute_slot = (t: String, offset: Int) => {
var dt:DateTime = DateTime.parse(t)
((Set(slot(dt)) ++ Set(slot(dt.minusSeconds(offset)))) ++ Set(slot (dt.plusSeconds(offset)))).toArray
}
spark.udf.register("get_slot", compute_slot)
如果直接调用函数效果很好...
compute_slot("2018-11-01T05:04:33.827+0000", 1800)
...或作为 SQL 中的 UDF:
%sql
SELECT explode(get_slot(cast("2018-11-01T05:04:33.827+0000" as string), 1800))
在 SparkSql SQL 查询中使用 UDF 时…
%sql
SELECT * FROM tab LATERAL VIEW explode(get_slot(cast(timestamp as string), 1800)) my_view
…或
val ColUDF = udf(compute_slot)
df.withColumn("arr", ColUDF($"timestamp", lit(1800))).show()
失败并显示以下错误消息。它提到了错误的日期格式,这让我感到困惑,因为该函数在我的 table.
的 SELECT 语句之外运行良好
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 177.0 failed 4 times, most recent failure: Lost task 0.3 in stage 177.0 (TID 33992, 10.139.64.4, executor 4): org.apache.spark.SparkException: Failed to execute user defined function($anonfun: (string, int) => array<int>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:620)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:49)
at org.apache.spark.sql.execution.collect.Collector$$anonfun.apply(Collector.scala:126)
at org.apache.spark.sql.execution.collect.Collector$$anonfun.apply(Collector.scala:125)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:112)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:384)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Invalid format: "2018-11-01 05:04:33.827" is malformed at " 05:04:33.827"
at org.joda.time.format.DateTimeFormatter.parseDateTime(DateTimeFormatter.java:945)
at org.joda.time.DateTime.parse(DateTime.java:160)
at org.joda.time.DateTime.parse(DateTime.java:149)
at line7c3ca3974ac14b88a9a351882d40a949342.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun.apply(command-1611582591913421:11)
at line7c3ca3974ac14b88a9a351882d40a949342.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun.apply(command-1611582591913421:10)
... 12 more
如果能帮助我解决我做错的事情,将不胜感激!
非常感谢!
保罗
默认情况下 parse
使用 ISODateTimeFormat
模式定义为:
datetime = time | date-opt-time
time = 'T' time-element [offset]
date-opt-time = date-element ['T' [time-element] [offset]]
date-element = std-date-element | ord-date-element | week-date-element
std-date-element = yyyy ['-' MM ['-' dd]]
ord-date-element = yyyy ['-' DDD]
week-date-element = xxxx '-W' ww ['-' e]
time-element = HH [minute-element] | [fraction]
minute-element = ':' mm [second-element] | [fraction]
second-element = ':' ss [fraction]
fraction = ('.' | ',') digit+
offset = 'Z' | (('+' | '-') HH [':' mm [':' ss [('.' | ',') SSS]]])
而您的日期不包含 offset
组件。要匹配 2018-11-01 05:04:33.827
,您应该提供格式:
scala> import org.joda.time.format.DateTimeFormat
import org.joda.time.format.DateTimeFormat
scala> DateTime.parse("2018-11-01 05:04:33.827", DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS"));
res3: org.joda.time.DateTime = 2018-11-01T05:04:33.827+01:00
timestamp
列的格式不正确。它在错误中说:
Caused by: java.lang.IllegalArgumentException: Invalid format: "2018-11-01 05:04:33.827" is malformed at " 05:04:33.827"
此外,这失败了:
get_slot(cast(timestamp as string), 1800)
但这成功了:
get_slot(cast("2018-11-01T05:04:33.827+0000" as string), 1800)
所以您的 timestamp
列格式不正确。只需添加 T
部分,就可以了。
我在 Azure DataBricks(Spark 2.3.1 和 Scala 2.11)中注册为 UDF 的以下 Scala 函数:
import org.joda.time.DateTime
val slot = (dt : DateTime) => {
var _s : Int = (dt.dayOfYear().get() - 1) * 24 + dt.hourOfDay().get()
_s
}
val compute_slot = (t: String, offset: Int) => {
var dt:DateTime = DateTime.parse(t)
((Set(slot(dt)) ++ Set(slot(dt.minusSeconds(offset)))) ++ Set(slot (dt.plusSeconds(offset)))).toArray
}
spark.udf.register("get_slot", compute_slot)
如果直接调用函数效果很好...
compute_slot("2018-11-01T05:04:33.827+0000", 1800)
...或作为 SQL 中的 UDF:
%sql
SELECT explode(get_slot(cast("2018-11-01T05:04:33.827+0000" as string), 1800))
在 SparkSql SQL 查询中使用 UDF 时…
%sql
SELECT * FROM tab LATERAL VIEW explode(get_slot(cast(timestamp as string), 1800)) my_view
…或
val ColUDF = udf(compute_slot)
df.withColumn("arr", ColUDF($"timestamp", lit(1800))).show()
失败并显示以下错误消息。它提到了错误的日期格式,这让我感到困惑,因为该函数在我的 table.
的 SELECT 语句之外运行良好org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 177.0 failed 4 times, most recent failure: Lost task 0.3 in stage 177.0 (TID 33992, 10.139.64.4, executor 4): org.apache.spark.SparkException: Failed to execute user defined function($anonfun: (string, int) => array<int>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:620)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:49)
at org.apache.spark.sql.execution.collect.Collector$$anonfun.apply(Collector.scala:126)
at org.apache.spark.sql.execution.collect.Collector$$anonfun.apply(Collector.scala:125)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:112)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:384)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Invalid format: "2018-11-01 05:04:33.827" is malformed at " 05:04:33.827"
at org.joda.time.format.DateTimeFormatter.parseDateTime(DateTimeFormatter.java:945)
at org.joda.time.DateTime.parse(DateTime.java:160)
at org.joda.time.DateTime.parse(DateTime.java:149)
at line7c3ca3974ac14b88a9a351882d40a949342.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun.apply(command-1611582591913421:11)
at line7c3ca3974ac14b88a9a351882d40a949342.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun.apply(command-1611582591913421:10)
... 12 more
如果能帮助我解决我做错的事情,将不胜感激!
非常感谢!
保罗
默认情况下 parse
使用 ISODateTimeFormat
模式定义为:
datetime = time | date-opt-time time = 'T' time-element [offset] date-opt-time = date-element ['T' [time-element] [offset]] date-element = std-date-element | ord-date-element | week-date-element std-date-element = yyyy ['-' MM ['-' dd]] ord-date-element = yyyy ['-' DDD] week-date-element = xxxx '-W' ww ['-' e] time-element = HH [minute-element] | [fraction] minute-element = ':' mm [second-element] | [fraction] second-element = ':' ss [fraction] fraction = ('.' | ',') digit+ offset = 'Z' | (('+' | '-') HH [':' mm [':' ss [('.' | ',') SSS]]])
而您的日期不包含 offset
组件。要匹配 2018-11-01 05:04:33.827
,您应该提供格式:
scala> import org.joda.time.format.DateTimeFormat
import org.joda.time.format.DateTimeFormat
scala> DateTime.parse("2018-11-01 05:04:33.827", DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS"));
res3: org.joda.time.DateTime = 2018-11-01T05:04:33.827+01:00
timestamp
列的格式不正确。它在错误中说:
Caused by: java.lang.IllegalArgumentException: Invalid format: "2018-11-01 05:04:33.827" is malformed at " 05:04:33.827"
此外,这失败了:
get_slot(cast(timestamp as string), 1800)
但这成功了:
get_slot(cast("2018-11-01T05:04:33.827+0000" as string), 1800)
所以您的 timestamp
列格式不正确。只需添加 T
部分,就可以了。