Databricks:未找到 Azure 队列存储结构化流密钥错误
Databricks: Azure Queue Storage structured streaming key not found error
我正在尝试为 AQS 流数据编写 ETL 管道。这是我的代码
CONN_STR = dbutils.secrets.get(scope="kvscope", key = "AZURE-STORAGE-CONN-STR")
schema = StructType([
StructField("id", IntegerType()),
StructField("parkingId", IntegerType()),
StructField("capacity", IntegerType()),
StructField("freePlaces", IntegerType()),
StructField("insertTime", TimestampType())
])
stream = spark.readStream \
.format("abs-aqs") \
.option("fileFormat", "json") \
.option("queueName", "freeparkingplaces") \
.option("connectionString", CONN_STR) \
.schema(schema) \
.load()
display(stream)
当我 运行 我得到 java.util.NoSuchElementException: key not found: eventType
这是我的队列的样子
你能找出问题所在并解释一下吗?
abs-aqs
连接器不是用于使用 AQS 中的数据,而是用于使用报告给 AQS 的事件获取有关 blob 存储中新文件的数据。这就是您指定文件格式选项和架构的原因 - 但这些参数将应用于文件,而不是 AQS 中的消息。
据我所知(我可能是错的),AQS 没有 Spark 连接器,通常建议使用 EventHubs 或 Kafka 作为消息传递解决方案。
我正在尝试为 AQS 流数据编写 ETL 管道。这是我的代码
CONN_STR = dbutils.secrets.get(scope="kvscope", key = "AZURE-STORAGE-CONN-STR")
schema = StructType([
StructField("id", IntegerType()),
StructField("parkingId", IntegerType()),
StructField("capacity", IntegerType()),
StructField("freePlaces", IntegerType()),
StructField("insertTime", TimestampType())
])
stream = spark.readStream \
.format("abs-aqs") \
.option("fileFormat", "json") \
.option("queueName", "freeparkingplaces") \
.option("connectionString", CONN_STR) \
.schema(schema) \
.load()
display(stream)
当我 运行 我得到 java.util.NoSuchElementException: key not found: eventType
这是我的队列的样子
你能找出问题所在并解释一下吗?
abs-aqs
连接器不是用于使用 AQS 中的数据,而是用于使用报告给 AQS 的事件获取有关 blob 存储中新文件的数据。这就是您指定文件格式选项和架构的原因 - 但这些参数将应用于文件,而不是 AQS 中的消息。
据我所知(我可能是错的),AQS 没有 Spark 连接器,通常建议使用 EventHubs 或 Kafka 作为消息传递解决方案。