如何将 DataFrame(从 foreach 内部的 RDD 构建)写入 Kafka?
How to write DataFrame (built from RDD inside foreach) to Kafka?
我正在尝试将 DataFrame
从 Spark 写入 Kafka,但我找不到任何解决方案。你能告诉我怎么做吗?
这是我当前的代码:
activityStream.foreachRDD { rdd =>
val activityDF = rdd
.toDF()
.selectExpr(
"timestamp_hour", "referrer", "action",
"prevPage", "page", "visitor", "product", "inputProps.topic as topic")
val producerRecord = new ProducerRecord(topicc, activityDF)
kafkaProducer.send(producerRecord) // <--- this shows an error
}
类型不匹配;发现:org.apache.kafka.clients.producer.ProducerRecord[Nothing,org .apache.spark.sql.Da taFrame](扩展为)org.apache.kafka.clients.producer.ProducerRecord[Nothing,org .apache.spark.sql.Da taset[org.apache.spa rk.sql.Row]] 要求:org.apache.kafka.clients.producer.ProducerRecord[Nothing,String] 涉及默认参数的应用程序发生错误。
在 activityDF
上执行 collect
以获取记录(不是 Dataset[Row]
)并将它们保存到 Kafka。
请注意,您将在 collect
之后得到一组记录,因此您可能需要对其进行迭代,例如
val activities = activityDF.collect()
// the following is pure Scala and has nothing to do with Spark
activities.foreach { a: Row =>
val pr: ProducerRecord = // map a to pr
kafkaProducer.send(pr)
}
在 Row
上使用模式匹配将其解构为 fields/columns,例如
activities.foreach { case Row(timestamp_hour, referrer, action, prevPage, page, visitor, product, topic) =>
// ...transform a to ProducerRecord
kafkaProducer.send(pr)
}
提示:我强烈建议使用 case class
并将 DataFrame
(= Dataset[Row]
) 转换为 Dataset[YourCaseClass]
.
参见 Spark SQL 的 Row and Kafka's ProducerRecord 文档。
正如 Joe Nate 在评论中指出的那样:
If you do "collect" before writing to any endpoint, it's going to make all the data aggregate at the driver and then make the driver write it out. 1) Can crash the driver if too much data (2) no parallelism in write.
100% 正确。我希望我说过:)
您可能想要使用 Writing Stream Output to Kafka 中描述的方法。
我正在尝试将 DataFrame
从 Spark 写入 Kafka,但我找不到任何解决方案。你能告诉我怎么做吗?
这是我当前的代码:
activityStream.foreachRDD { rdd =>
val activityDF = rdd
.toDF()
.selectExpr(
"timestamp_hour", "referrer", "action",
"prevPage", "page", "visitor", "product", "inputProps.topic as topic")
val producerRecord = new ProducerRecord(topicc, activityDF)
kafkaProducer.send(producerRecord) // <--- this shows an error
}
类型不匹配;发现:org.apache.kafka.clients.producer.ProducerRecord[Nothing,org .apache.spark.sql.Da taFrame](扩展为)org.apache.kafka.clients.producer.ProducerRecord[Nothing,org .apache.spark.sql.Da taset[org.apache.spa rk.sql.Row]] 要求:org.apache.kafka.clients.producer.ProducerRecord[Nothing,String] 涉及默认参数的应用程序发生错误。
在 activityDF
上执行 collect
以获取记录(不是 Dataset[Row]
)并将它们保存到 Kafka。
请注意,您将在 collect
之后得到一组记录,因此您可能需要对其进行迭代,例如
val activities = activityDF.collect()
// the following is pure Scala and has nothing to do with Spark
activities.foreach { a: Row =>
val pr: ProducerRecord = // map a to pr
kafkaProducer.send(pr)
}
在 Row
上使用模式匹配将其解构为 fields/columns,例如
activities.foreach { case Row(timestamp_hour, referrer, action, prevPage, page, visitor, product, topic) =>
// ...transform a to ProducerRecord
kafkaProducer.send(pr)
}
提示:我强烈建议使用 case class
并将 DataFrame
(= Dataset[Row]
) 转换为 Dataset[YourCaseClass]
.
参见 Spark SQL 的 Row and Kafka's ProducerRecord 文档。
正如 Joe Nate 在评论中指出的那样:
If you do "collect" before writing to any endpoint, it's going to make all the data aggregate at the driver and then make the driver write it out. 1) Can crash the driver if too much data (2) no parallelism in write.
100% 正确。我希望我说过:)
您可能想要使用 Writing Stream Output to Kafka 中描述的方法。