结构化流式传输 - java.util.NoSuchElementException:找不到密钥
Structured Streaming - java.util.NoSuchElementException: key not found
我们最近开始在 Azure 数据块上进行结构化流式处理。
目前我们正在使用来自事件中心的事件并将它们作为镶木地板写入 azure datalake 存储。
我能够将流写入控制台,但是当我们尝试将它们写入任何物理存储(Blob/Azure Datalake)时遇到错误"java.util.NoSuchElementException: key not found:"
val schema = new StructType()
.add("col1",StringType, nullable = true)
.add("col2", StringType, nullable = true)
.add("col3", StringType, nullable = true)
.add("col4",StringType, nullable = true)
val messages = incomingStream.selectExpr("offset","partitionKey","cast (body as string) AS Content")
val structuredMsg = messages.select($"offset",$"partitionKey",from_json(col("Content"),schema).alias("data"))
val results = structuredMsg.
select($"offset",$"partitionKey",current_date().as("date_1"),$"data.col1".as("col1"),$"data.col2".as("col2"),$"data.col3".as("col3"),$"data.col4".as("col4"))
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
results.
withColumn("date", $"date_1").
writeStream.
format("text"). // write as Parquet partitioned by date
partitionBy("date").
option("path", "dbfs:/mnt/datalake/XXX-databricks-mount/XXX-databricks/test").
option("checkpointLocation", "dbfs:/checkpoint_path/").
trigger(Trigger.ProcessingTime(60.seconds)).
outputMode(OutputMode.Append).
start
java.util.NoSuchElementException: key not found: {"ehName":"test1","partitionId":1}
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:59)
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$getBatch.apply(EventHubsSource.scala:233)
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$getBatch.apply(EventHubsSource.scala:231)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.immutable.Map$Map2.foreach(Map.scala:137)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.eventhubs.EventHubsSource.getBatch(EventHubsSource.scala:231)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$$anonfun$apply.apply(MicroBatchExecution.scala:394)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$$anonfun$apply.apply(MicroBatchExecution.scala:390)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
从 writeStream 语句和代码中单独分配了 sparkSession 检查点路径。
spark.conf.set("spark.sql.streaming.checkpointLocation", "dbfs:/checkpoint_path/");
results.writeStream.outputMode("append").format("csv").option("path", "dbfs:/mnt/datalake/XXX-databricks-mount/XXX-databricks/test").start().awaitTermination()
您可能对多个流式传输作业使用了相同的检查点位置。一个流开始写入,另一个流试图读取它并解释将导致错误的条目。
我有同样的问题从 2 个事件中心读取并使用相同的检查点位置,这导致我的第二份工作尝试从不存在的 topic/partition 组合中读取。
我们最近开始在 Azure 数据块上进行结构化流式处理。 目前我们正在使用来自事件中心的事件并将它们作为镶木地板写入 azure datalake 存储。
我能够将流写入控制台,但是当我们尝试将它们写入任何物理存储(Blob/Azure Datalake)时遇到错误"java.util.NoSuchElementException: key not found:"
val schema = new StructType()
.add("col1",StringType, nullable = true)
.add("col2", StringType, nullable = true)
.add("col3", StringType, nullable = true)
.add("col4",StringType, nullable = true)
val messages = incomingStream.selectExpr("offset","partitionKey","cast (body as string) AS Content")
val structuredMsg = messages.select($"offset",$"partitionKey",from_json(col("Content"),schema).alias("data"))
val results = structuredMsg.
select($"offset",$"partitionKey",current_date().as("date_1"),$"data.col1".as("col1"),$"data.col2".as("col2"),$"data.col3".as("col3"),$"data.col4".as("col4"))
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
results.
withColumn("date", $"date_1").
writeStream.
format("text"). // write as Parquet partitioned by date
partitionBy("date").
option("path", "dbfs:/mnt/datalake/XXX-databricks-mount/XXX-databricks/test").
option("checkpointLocation", "dbfs:/checkpoint_path/").
trigger(Trigger.ProcessingTime(60.seconds)).
outputMode(OutputMode.Append).
start
java.util.NoSuchElementException: key not found: {"ehName":"test1","partitionId":1}
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:59)
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$getBatch.apply(EventHubsSource.scala:233)
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$getBatch.apply(EventHubsSource.scala:231)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.immutable.Map$Map2.foreach(Map.scala:137)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.eventhubs.EventHubsSource.getBatch(EventHubsSource.scala:231)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$$anonfun$apply.apply(MicroBatchExecution.scala:394)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$$anonfun$apply.apply(MicroBatchExecution.scala:390)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
从 writeStream 语句和代码中单独分配了 sparkSession 检查点路径。
spark.conf.set("spark.sql.streaming.checkpointLocation", "dbfs:/checkpoint_path/");
results.writeStream.outputMode("append").format("csv").option("path", "dbfs:/mnt/datalake/XXX-databricks-mount/XXX-databricks/test").start().awaitTermination()
您可能对多个流式传输作业使用了相同的检查点位置。一个流开始写入,另一个流试图读取它并解释将导致错误的条目。
我有同样的问题从 2 个事件中心读取并使用相同的检查点位置,这导致我的第二份工作尝试从不存在的 topic/partition 组合中读取。