Databricks:必须使用 writeStream.start() 执行流式源查询

Databricks: Queries with streaming sources must be executed with writeStream.start()

我知道关于此错误消息还有其他几个问题,但 none 似乎与我目前面临的问题有关。我正在从 JSON 文件流式传输(这部分有效):

gamingEventDF = (spark
.readStream
.schema(eventSchema) 
.option('streamName','mobilestreaming_demo') 
.option("maxFilesPerTrigger", 1)
.json(inputPath) 
)

接下来我想使用 writeStream 将其附加到 table:

def writeToBronze(sourceDataframe, bronzePath, streamName):
(sourceDataframe.rdd
.spark
.writeStream.format("delta")
.option("checkpointLocation", bronzePath + "/_checkpoint")
.queryName(streamName)
.outputMode("append") 
.start(bronzePath)
)

当我现在运行:

writeToBronze(gamingEventDF, outputPathBronze, "bronze_stream")

我收到错误:AnalysisException:必须使用 writeStream.start()

执行带有流源的查询

顺便说一句:当我删除 .rdd 时,出现另一个错误('DataFrame' 对象没有属性 'spark')

知道我错了什么吗? 非常感谢

writeStream 方法在数据帧上可用 class 在 SparkSession 上不可用。

以下代码应该适合您。

def writeToBronze(sourceDataframe, bronzePath, streamName):
  (sourceDataframe
  .writeStream.format("delta")
  .option("checkpointLocation", bronzePath + "/_checkpoint")
  .queryName(streamName)
  .outputMode("append") 
  .start(bronzePath)
  .awaitTermination())