Databricks:必须使用 writeStream.start() 执行流式源查询
Databricks: Queries with streaming sources must be executed with writeStream.start()
我知道关于此错误消息还有其他几个问题,但 none 似乎与我目前面临的问题有关。我正在从 JSON 文件流式传输(这部分有效):
gamingEventDF = (spark
.readStream
.schema(eventSchema)
.option('streamName','mobilestreaming_demo')
.option("maxFilesPerTrigger", 1)
.json(inputPath)
)
接下来我想使用 writeStream 将其附加到 table:
def writeToBronze(sourceDataframe, bronzePath, streamName):
(sourceDataframe.rdd
.spark
.writeStream.format("delta")
.option("checkpointLocation", bronzePath + "/_checkpoint")
.queryName(streamName)
.outputMode("append")
.start(bronzePath)
)
当我现在运行:
writeToBronze(gamingEventDF, outputPathBronze, "bronze_stream")
我收到错误:AnalysisException:必须使用 writeStream.start()
执行带有流源的查询
顺便说一句:当我删除 .rdd 时,出现另一个错误('DataFrame' 对象没有属性 'spark')
知道我错了什么吗?
非常感谢
writeStream 方法在数据帧上可用 class 在 SparkSession 上不可用。
以下代码应该适合您。
def writeToBronze(sourceDataframe, bronzePath, streamName):
(sourceDataframe
.writeStream.format("delta")
.option("checkpointLocation", bronzePath + "/_checkpoint")
.queryName(streamName)
.outputMode("append")
.start(bronzePath)
.awaitTermination())
我知道关于此错误消息还有其他几个问题,但 none 似乎与我目前面临的问题有关。我正在从 JSON 文件流式传输(这部分有效):
gamingEventDF = (spark
.readStream
.schema(eventSchema)
.option('streamName','mobilestreaming_demo')
.option("maxFilesPerTrigger", 1)
.json(inputPath)
)
接下来我想使用 writeStream 将其附加到 table:
def writeToBronze(sourceDataframe, bronzePath, streamName):
(sourceDataframe.rdd
.spark
.writeStream.format("delta")
.option("checkpointLocation", bronzePath + "/_checkpoint")
.queryName(streamName)
.outputMode("append")
.start(bronzePath)
)
当我现在运行:
writeToBronze(gamingEventDF, outputPathBronze, "bronze_stream")
我收到错误:AnalysisException:必须使用 writeStream.start()
顺便说一句:当我删除 .rdd 时,出现另一个错误('DataFrame' 对象没有属性 'spark')
知道我错了什么吗? 非常感谢
writeStream 方法在数据帧上可用 class 在 SparkSession 上不可用。
以下代码应该适合您。
def writeToBronze(sourceDataframe, bronzePath, streamName):
(sourceDataframe
.writeStream.format("delta")
.option("checkpointLocation", bronzePath + "/_checkpoint")
.queryName(streamName)
.outputMode("append")
.start(bronzePath)
.awaitTermination())