如何在 Azure Databricks 中使用 EventHubsForeachWriter

How to use EventHubsForeachWriter in Azure Databricks

我正在尝试使用 EventHubsForeachWriter,如图所示 here:

val ehConf = EventHubsConf("YOUR_CONNECTION_STRING") 
val writer = EventHubsForeachWriter(ehConf)

val query =
  streamingSelectDF
    .writeStream
    .foreach(writer)
    .outputMode("update")
    .trigger(ProcessingTime("25 seconds"))
    .start()

但我遇到了一个例外:

notebook:22: error: type mismatch;
 found   : org.apache.spark.sql.eventhubs.EventHubsForeachWriter
 required: org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]
    .foreach(writer)

在这个 github issue.

中找到了答案

所以,我想以下应该可行:

val query =
  streamingSelectDF
    .select(to_json(struct("*")) as 'body)
    .selectExpr("cast(body as string)")
    .as[String]
    .writeStream
    .foreach(writer)
    .outputMode("update")
    .trigger(ProcessingTime("25 seconds"))
    .start()