如何在 Azure Databricks 中使用 EventHubsForeachWriter
How to use EventHubsForeachWriter in Azure Databricks
我正在尝试使用 EventHubsForeachWriter,如图所示 here:
val ehConf = EventHubsConf("YOUR_CONNECTION_STRING")
val writer = EventHubsForeachWriter(ehConf)
val query =
streamingSelectDF
.writeStream
.foreach(writer)
.outputMode("update")
.trigger(ProcessingTime("25 seconds"))
.start()
但我遇到了一个例外:
notebook:22: error: type mismatch;
found : org.apache.spark.sql.eventhubs.EventHubsForeachWriter
required: org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]
.foreach(writer)
在这个 github issue.
中找到了答案
所以,我想以下应该可行:
val query =
streamingSelectDF
.select(to_json(struct("*")) as 'body)
.selectExpr("cast(body as string)")
.as[String]
.writeStream
.foreach(writer)
.outputMode("update")
.trigger(ProcessingTime("25 seconds"))
.start()
我正在尝试使用 EventHubsForeachWriter,如图所示 here:
val ehConf = EventHubsConf("YOUR_CONNECTION_STRING")
val writer = EventHubsForeachWriter(ehConf)
val query =
streamingSelectDF
.writeStream
.foreach(writer)
.outputMode("update")
.trigger(ProcessingTime("25 seconds"))
.start()
但我遇到了一个例外:
notebook:22: error: type mismatch;
found : org.apache.spark.sql.eventhubs.EventHubsForeachWriter
required: org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]
.foreach(writer)
在这个 github issue.
中找到了答案所以,我想以下应该可行:
val query =
streamingSelectDF
.select(to_json(struct("*")) as 'body)
.selectExpr("cast(body as string)")
.as[String]
.writeStream
.foreach(writer)
.outputMode("update")
.trigger(ProcessingTime("25 seconds"))
.start()