在 Scala 中将 Dataframe 的多列写入 Kafka

Writing multiple columns of a Dataframe to Kafka in Scala

构建 ,如何将数据帧的所有列写入 kafka 主题。

目前我有一个包含一些列的数据框,我应该用一个键将其写入 kafka,因此我从旧数据框创建一个新数据框并指定键和值:

val endDf: DataFrame = midDf.withColumn("key",lit(keyval)).withColumn("value",lit(testVal))

现在,当我将此写入 kafka 时,我指定:

endDf.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "test:8808")
  .option("topic", "topic1")
  .save()

如果值是单列,则此方法有效。但是初始数据框由多列组成,我需要以 JSON 格式编写所有这些。

如何将所有列写为 value。我觉得它围绕使用 interDf.columnsto_json

的合并展开

Kafka 需要一个 key 和一个 value;因此,您必须使用 to_json():

将所有剩余列(即键列除外)聚合为一个值
import org.apache.spark.sql.functions._

val value_col_names = endDf.columns.filter(_ != "yourKeyColumn") 

endDf.withColumnRenamed("yourKeyColumn", "key") \ 
     .withColumn("value", to_json(struct(value_col_names.map(col(_)):_*))) \
     .select("key", "value") \ 
     .write() \ 
     .format("kafka") \ 
     .option("kafka.bootstrap.servers", "test:8808") \ 
     .option("topic", "topic1") \ 
     .save()