尝试同时使用 udf 和 to_json 时获取 "Task not serializable: java.io.NotSerializableException"
Getting "Task not serializable: java.io.NotSerializableException" while trying to use udf and to_json together
我一直在试图找出确切的问题所在,但无法做到。也尝试遵循类似的方法 但仍然无法理解问题。
下面是我的代码片段,
val writingDataset = sparkSession
.readStream
.format("kafka")
.option(kafkaBootstrapServers, urls)
.option("subscribe", inputTopics)
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// .withColumn("value", parser.parseUDF('value).as("value")) //combination of this two line doesn't work either
// .withColumn("value", to_json('value).as("value")) //combination of this two line doesn't work either
.select(col("key"), to_json(parser.parseUDF('value)).as("value"))
.writeStream
.format("console")
.start()
writingDataset.awaitTermination
下面是我的 udf 代码
val parse = (value: String) => {
Some(CompanyDetail("something", "something"))
}
import org.apache.spark.sql.functions.udf
val parseUDF = udf(parse)
val keyUDF = udf(keyParse)
不确定这里发生了什么,但我不断收到以下错误
org.apache.spark.SparkException: Writing job aborted.
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$Lambda51/0000000000000000.apply(Unknown Source)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery(SparkPlan.scala:155)
at org.apache.spark.sql.execution.SparkPlan$$Lambda80/0000000000000000.apply(Unknown Source)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3383)
at org.apache.spark.sql.Dataset.$anonfun$collect(Dataset.scala:2782)
at org.apache.spark.sql.Dataset$$Lambda66/000000006C38DB10.apply(Unknown Source)
at org.apache.spark.sql.Dataset.$anonfun$withAction(Dataset.scala:3364)
at org.apache.spark.sql.Dataset$$Lambda69/000000006C38F210.apply(Unknown Source)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$$$Lambda40/000000006C25F080.apply(Unknown Source)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2782)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch(MicroBatchExecution.scala:540)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda36/000000006C25E1B0.apply(Unknown Source)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$$$Lambda40/000000006C25F080.apply(Unknown Source)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch(MicroBatchExecution.scala:536)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda35/000000006C25DA80.apply(Unknown Source)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:349)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:535)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:198)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda95/000000006C02DE80.apply$mcV$sp(Unknown Source)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:349)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda93/000000006C02CF10.apply$mcZ$sp(Unknown Source)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:193)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: scala.runtime.LazyRef
我自己想出来了。 spark代码没有任何问题。这是scala版本的问题。一旦我将 scala version
降级为 2.11.0
,它就起作用了
我一直在试图找出确切的问题所在,但无法做到。也尝试遵循类似的方法
下面是我的代码片段,
val writingDataset = sparkSession
.readStream
.format("kafka")
.option(kafkaBootstrapServers, urls)
.option("subscribe", inputTopics)
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// .withColumn("value", parser.parseUDF('value).as("value")) //combination of this two line doesn't work either
// .withColumn("value", to_json('value).as("value")) //combination of this two line doesn't work either
.select(col("key"), to_json(parser.parseUDF('value)).as("value"))
.writeStream
.format("console")
.start()
writingDataset.awaitTermination
下面是我的 udf 代码
val parse = (value: String) => {
Some(CompanyDetail("something", "something"))
}
import org.apache.spark.sql.functions.udf
val parseUDF = udf(parse)
val keyUDF = udf(keyParse)
不确定这里发生了什么,但我不断收到以下错误
org.apache.spark.SparkException: Writing job aborted.
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$Lambda51/0000000000000000.apply(Unknown Source)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery(SparkPlan.scala:155)
at org.apache.spark.sql.execution.SparkPlan$$Lambda80/0000000000000000.apply(Unknown Source)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3383)
at org.apache.spark.sql.Dataset.$anonfun$collect(Dataset.scala:2782)
at org.apache.spark.sql.Dataset$$Lambda66/000000006C38DB10.apply(Unknown Source)
at org.apache.spark.sql.Dataset.$anonfun$withAction(Dataset.scala:3364)
at org.apache.spark.sql.Dataset$$Lambda69/000000006C38F210.apply(Unknown Source)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$$$Lambda40/000000006C25F080.apply(Unknown Source)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2782)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch(MicroBatchExecution.scala:540)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda36/000000006C25E1B0.apply(Unknown Source)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$$$Lambda40/000000006C25F080.apply(Unknown Source)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch(MicroBatchExecution.scala:536)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda35/000000006C25DA80.apply(Unknown Source)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:349)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:535)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:198)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda95/000000006C02DE80.apply$mcV$sp(Unknown Source)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:349)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda93/000000006C02CF10.apply$mcZ$sp(Unknown Source)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:193)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: scala.runtime.LazyRef
我自己想出来了。 spark代码没有任何问题。这是scala版本的问题。一旦我将 scala version
降级为 2.11.0