在 Scala 中将 Dataframe 的多列写入 Kafka
Writing multiple columns of a Dataframe to Kafka in Scala
构建 ,如何将数据帧的所有列写入 kafka 主题。
目前我有一个包含一些列的数据框,我应该用一个键将其写入 kafka,因此我从旧数据框创建一个新数据框并指定键和值:
val endDf: DataFrame = midDf.withColumn("key",lit(keyval)).withColumn("value",lit(testVal))
现在,当我将此写入 kafka 时,我指定:
endDf.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "test:8808")
.option("topic", "topic1")
.save()
如果值是单列,则此方法有效。但是初始数据框由多列组成,我需要以 JSON 格式编写所有这些。
如何将所有列写为 value
。我觉得它围绕使用 interDf.columns
和 to_json
的合并展开
Kafka 需要一个 key 和一个 value;因此,您必须使用 to_json()
:
将所有剩余列(即键列除外)聚合为一个值
import org.apache.spark.sql.functions._
val value_col_names = endDf.columns.filter(_ != "yourKeyColumn")
endDf.withColumnRenamed("yourKeyColumn", "key") \
.withColumn("value", to_json(struct(value_col_names.map(col(_)):_*))) \
.select("key", "value") \
.write() \
.format("kafka") \
.option("kafka.bootstrap.servers", "test:8808") \
.option("topic", "topic1") \
.save()
构建
目前我有一个包含一些列的数据框,我应该用一个键将其写入 kafka,因此我从旧数据框创建一个新数据框并指定键和值:
val endDf: DataFrame = midDf.withColumn("key",lit(keyval)).withColumn("value",lit(testVal))
现在,当我将此写入 kafka 时,我指定:
endDf.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "test:8808")
.option("topic", "topic1")
.save()
如果值是单列,则此方法有效。但是初始数据框由多列组成,我需要以 JSON 格式编写所有这些。
如何将所有列写为 value
。我觉得它围绕使用 interDf.columns
和 to_json
Kafka 需要一个 key 和一个 value;因此,您必须使用 to_json()
:
import org.apache.spark.sql.functions._
val value_col_names = endDf.columns.filter(_ != "yourKeyColumn")
endDf.withColumnRenamed("yourKeyColumn", "key") \
.withColumn("value", to_json(struct(value_col_names.map(col(_)):_*))) \
.select("key", "value") \
.write() \
.format("kafka") \
.option("kafka.bootstrap.servers", "test:8808") \
.option("topic", "topic1") \
.save()