事件中心:org.apache.spark.sql.AnalysisException:找不到必需的属性 'body'
Event Hub: org.apache.spark.sql.AnalysisException: Required attribute 'body' not found
我正在尝试将更改数据捕获写入 EventHub,如下所示:
df = spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("cdc_test1")
在写入 azure eventhub 时,它希望将内容写入 body 属性:
df.writeStream.format("eventhubs").option("checkpointLocation", checkpointLocation).outputMode("append").options(**ehConf).start()
它给出了异常
org.apache.spark.sql.AnalysisException: Required attribute 'body' not found.
at org.apache.spark.sql.eventhubs.EventHubsWriter$.$anonfun$validateQuery(EventHubsWriter.scala:53)
我不确定如何将整个流包装到一个主体中。我想,我需要另一个流对象,它有一个值为“df”(原始流)的列主体作为字符串。我无法做到这一点。请帮忙!
您只需要使用函数struct (to encode all columns as one object) and something like to_json (to create a single value from the object - you can use other functions, like, to_csv
, or to_avro创建此列,但这取决于与消费者的合同)。代码可能如下所示:
df.select(F.to_json(F.struct("*")).alias("body"))\
.writeStream.format("eventhubs")\
.option("checkpointLocation", checkpointLocation)\
.outputMode("append")\
.options(**ehConf).start()
我正在尝试将更改数据捕获写入 EventHub,如下所示:
df = spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("cdc_test1")
在写入 azure eventhub 时,它希望将内容写入 body 属性:
df.writeStream.format("eventhubs").option("checkpointLocation", checkpointLocation).outputMode("append").options(**ehConf).start()
它给出了异常
org.apache.spark.sql.AnalysisException: Required attribute 'body' not found.
at org.apache.spark.sql.eventhubs.EventHubsWriter$.$anonfun$validateQuery(EventHubsWriter.scala:53)
我不确定如何将整个流包装到一个主体中。我想,我需要另一个流对象,它有一个值为“df”(原始流)的列主体作为字符串。我无法做到这一点。请帮忙!
您只需要使用函数struct (to encode all columns as one object) and something like to_json (to create a single value from the object - you can use other functions, like, to_csv
, or to_avro创建此列,但这取决于与消费者的合同)。代码可能如下所示:
df.select(F.to_json(F.struct("*")).alias("body"))\
.writeStream.format("eventhubs")\
.option("checkpointLocation", checkpointLocation)\
.outputMode("append")\
.options(**ehConf).start()