为 Azure Data Bricks 中的多个查询重复使用事件中心流
Reusing an Event Hub stream for multiple queries in Azure Data Bricks
在 Azure Databricks Structured Streaming(scala 笔记本,连接到 Azure IoT 中心)中,我在 Azure IoT 中心的事件中心兼容端点上打开一个流。然后我根据结构化模式解析传入流,并在同一流上创建 3 个查询 (groupBy)。
大多数时候(似乎并非总是如此),我在围绕分区上的纪元值的显示查询之一中遇到异常。 (见下文)
我正在使用一个没有其他应用程序正在读取的专用消费者组。所以,我猜会支持打开 1 个流并对其进行多个流式查询吗?
有什么建议、解释或想法来解决这个问题吗? (我想避免必须创建 3 个消费者组并再次定义流 3 次)
异常示例:
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 3 in stage 1064.0 failed 4 times, most recent failure: Lost task
3.3 in stage 1064.0 (TID 24790, 10.139.64.10, executor 7): java.util.concurrent.CompletionException:
com.microsoft.azure.eventhubs.ReceiverDisconnectedException: New
receiver with higher epoch of '0' is created hence current receiver
with epoch '0' is getting disconnected. If you are recreating the
receiver, make sure a higher epoch is used. TrackingId:xxxx,
SystemTracker:iothub-name|databricks-db,
Timestamp:2019-02-18T15:25:19, errorContext[NS: yyy, PATH:
savanh-traffic-camera2/ConsumerGroups/databricks-db/Partitions/3,
REFERENCE_ID: a0e445_7319_G2_1550503505013, PREFETCH_COUNT: 500,
LINK_CREDIT: 500, PREFETCH_Q_LEN: 0]
这是我的代码:(已清理)
// Define schema and create incoming camera eventstream
val cameraEventSchema = new StructType()
.add("TrajectId", StringType)
.add("EventTime", StringType)
.add("Country", StringType)
.add("Make", StringType)
val iotHubParameters =
EventHubsConf(cameraHubConnectionString)
.setConsumerGroup("databricks-db")
.setStartingPosition(EventPosition.fromEndOfStream)
val incomingStream = spark.readStream.format("eventhubs").options(iotHubParameters.toMap).load()
// Define parsing query selecting the required properties from the incoming telemetry data
val cameraMessages =
incomingStream
.withColumn("Offset", $"offset".cast(LongType))
.withColumn("Time (readable)", $"enqueuedTime".cast(TimestampType))
.withColumn("Timestamp", $"enqueuedTime".cast(LongType))
.withColumn("Body", $"body".cast(StringType))
// Select the event hub fields so we can work with them
.select("Offset", "Time (readable)", "Timestamp", "Body")
// Parse the "Body" column as a JSON Schema which we defined above
.select(from_json($"Body", cameraEventSchema) as "cameraevents")
// Now select the values from our JSON Structure and cast them manually to avoid problems
.select(
$"cameraevents.TrajectId".cast("string").alias("TrajectId"),
$"cameraevents.EventTime".cast("timestamp").alias("EventTime"),
$"cameraevents.Country".cast("string").alias("Country"),
$"cameraevents.Make".cast("string").alias("Make")
)
.withWatermark("EventTime", "10 seconds")
val groupedDataFrame =
cameraMessages
.groupBy(window($"EventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
.select($"window".getField("start") as 'window, $"count")
display(groupedDataFrame)
val makeDataFrame =
cameraMessages
.groupBy("Make")
.agg(count("*") as 'count)
.sort($"count".desc)
display(makeDataFrame)
val countryDataFrame =
cameraMessages
.groupBy("Country")
.agg(count("*") as 'count)
.sort($"count".desc)
display(countryDataFrame)
您可以将流数据存储到table或文件位置,然后您可以运行对该table或文件的多个查询,都是运行实时宁宁。
对于文件,在将数据提取到数据框中时需要指定模式,因此最好将流数据写入 table.
cameraMessages.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation","/data/events/_checkpoints/data_file")
.table("events")
现在您可以在 table 'events' 上执行查询。
对于数据框 -
cameraMessages = spark.readStream.table("events")
我在使用 EventHub 时遇到了同样的问题,上面的技巧对我有用。
用于使用文件而不是 table
//Write/Append streaming data to file
cameraMessages.writeStream
.format("parquet")
.outputMode("append")
.option("checkpointLocation", "/FileStore/StreamCheckPoint.parquet")
.option("path","/FileStore/StreamData")
.start()
//Read data from the file, we need to specify the schema for it
val Schema = (
new StructType()
.add(StructField("TrajectId", StringType))
.add(StructField("EventTime", TimestampType))
.add(StructField("Country", StringType))
.add(StructField("Make", StringType))
)
val cameraMessages = (
sqlContext.readStream
.option("maxEventsPerTrigger", 1)
.schema(Schema)
.parquet("/FileStore/StreamData")
)
在 Azure Databricks Structured Streaming(scala 笔记本,连接到 Azure IoT 中心)中,我在 Azure IoT 中心的事件中心兼容端点上打开一个流。然后我根据结构化模式解析传入流,并在同一流上创建 3 个查询 (groupBy)。
大多数时候(似乎并非总是如此),我在围绕分区上的纪元值的显示查询之一中遇到异常。 (见下文)
我正在使用一个没有其他应用程序正在读取的专用消费者组。所以,我猜会支持打开 1 个流并对其进行多个流式查询吗?
有什么建议、解释或想法来解决这个问题吗? (我想避免必须创建 3 个消费者组并再次定义流 3 次)
异常示例:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 1064.0 failed 4 times, most recent failure: Lost task 3.3 in stage 1064.0 (TID 24790, 10.139.64.10, executor 7): java.util.concurrent.CompletionException: com.microsoft.azure.eventhubs.ReceiverDisconnectedException: New receiver with higher epoch of '0' is created hence current receiver with epoch '0' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used. TrackingId:xxxx, SystemTracker:iothub-name|databricks-db, Timestamp:2019-02-18T15:25:19, errorContext[NS: yyy, PATH: savanh-traffic-camera2/ConsumerGroups/databricks-db/Partitions/3, REFERENCE_ID: a0e445_7319_G2_1550503505013, PREFETCH_COUNT: 500, LINK_CREDIT: 500, PREFETCH_Q_LEN: 0]
这是我的代码:(已清理)
// Define schema and create incoming camera eventstream
val cameraEventSchema = new StructType()
.add("TrajectId", StringType)
.add("EventTime", StringType)
.add("Country", StringType)
.add("Make", StringType)
val iotHubParameters =
EventHubsConf(cameraHubConnectionString)
.setConsumerGroup("databricks-db")
.setStartingPosition(EventPosition.fromEndOfStream)
val incomingStream = spark.readStream.format("eventhubs").options(iotHubParameters.toMap).load()
// Define parsing query selecting the required properties from the incoming telemetry data
val cameraMessages =
incomingStream
.withColumn("Offset", $"offset".cast(LongType))
.withColumn("Time (readable)", $"enqueuedTime".cast(TimestampType))
.withColumn("Timestamp", $"enqueuedTime".cast(LongType))
.withColumn("Body", $"body".cast(StringType))
// Select the event hub fields so we can work with them
.select("Offset", "Time (readable)", "Timestamp", "Body")
// Parse the "Body" column as a JSON Schema which we defined above
.select(from_json($"Body", cameraEventSchema) as "cameraevents")
// Now select the values from our JSON Structure and cast them manually to avoid problems
.select(
$"cameraevents.TrajectId".cast("string").alias("TrajectId"),
$"cameraevents.EventTime".cast("timestamp").alias("EventTime"),
$"cameraevents.Country".cast("string").alias("Country"),
$"cameraevents.Make".cast("string").alias("Make")
)
.withWatermark("EventTime", "10 seconds")
val groupedDataFrame =
cameraMessages
.groupBy(window($"EventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
.select($"window".getField("start") as 'window, $"count")
display(groupedDataFrame)
val makeDataFrame =
cameraMessages
.groupBy("Make")
.agg(count("*") as 'count)
.sort($"count".desc)
display(makeDataFrame)
val countryDataFrame =
cameraMessages
.groupBy("Country")
.agg(count("*") as 'count)
.sort($"count".desc)
display(countryDataFrame)
您可以将流数据存储到table或文件位置,然后您可以运行对该table或文件的多个查询,都是运行实时宁宁。 对于文件,在将数据提取到数据框中时需要指定模式,因此最好将流数据写入 table.
cameraMessages.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation","/data/events/_checkpoints/data_file")
.table("events")
现在您可以在 table 'events' 上执行查询。 对于数据框 -
cameraMessages = spark.readStream.table("events")
我在使用 EventHub 时遇到了同样的问题,上面的技巧对我有用。
用于使用文件而不是 table
//Write/Append streaming data to file
cameraMessages.writeStream
.format("parquet")
.outputMode("append")
.option("checkpointLocation", "/FileStore/StreamCheckPoint.parquet")
.option("path","/FileStore/StreamData")
.start()
//Read data from the file, we need to specify the schema for it
val Schema = (
new StructType()
.add(StructField("TrajectId", StringType))
.add(StructField("EventTime", TimestampType))
.add(StructField("Country", StringType))
.add(StructField("Make", StringType))
)
val cameraMessages = (
sqlContext.readStream
.option("maxEventsPerTrigger", 1)
.schema(Schema)
.parquet("/FileStore/StreamData")
)