事件中心:org.apache.spark.sql.AnalysisException:找不到必需的属性 'body'

Event Hub: org.apache.spark.sql.AnalysisException: Required attribute 'body' not found

我正在尝试将更改数据捕获写入 EventHub,如下所示:

df = spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("cdc_test1")

在写入 azure eventhub 时,它希望将内容写入 body 属性:

df.writeStream.format("eventhubs").option("checkpointLocation", checkpointLocation).outputMode("append").options(**ehConf).start()

它给出了异常

org.apache.spark.sql.AnalysisException: Required attribute 'body' not found.
    at org.apache.spark.sql.eventhubs.EventHubsWriter$.$anonfun$validateQuery(EventHubsWriter.scala:53)

我不确定如何将整个流包装到一个主体中。我想,我需要另一个流对象,它有一个值为“df”(原始流)的列主体作为字符串。我无法做到这一点。请帮忙!

您只需要使用函数struct (to encode all columns as one object) and something like to_json (to create a single value from the object - you can use other functions, like, to_csv, or to_avro创建此列,但这取决于与消费者的合同)。代码可能如下所示:

df.select(F.to_json(F.struct("*")).alias("body"))\
    .writeStream.format("eventhubs")\
    .option("checkpointLocation", checkpointLocation)\
    .outputMode("append")\
    .options(**ehConf).start()