将 Spark DataFrame 写入 Kafka 会忽略分区列和 kafka.partitioner.class
Writing Spark DataFrame to Kafka is ignoring the partition column and kafka.partitioner.class
我正在尝试将 Spark DF(批处理 DF)写入 Kafka,我需要将数据写入特定分区。
我尝试了以下代码
myDF.write
.format("kafka")
.option("kafka.bootstrap.servers", kafkaProps.getBootstrapServers)
.option("kafka.security.protocol", "SSL")
.option("kafka.truststore.location", kafkaProps.getTrustStoreLocation)
.option("kafka.truststore.password", kafkaProps.getTrustStorePassword)
.option("kafka.keystore.location", kafkaProps.getKeyStoreLocation)
.option("kafka.keystore.password", kafkaProps.getKeyStorePassword)
.option("kafka.partitioner.class", "util.MyCustomPartitioner")
.option("topic",kafkaProps.getTopicName)
.save()
而我正在编写的 DF 的架构是
+---+---------+-----+
|key|partition|value|
+---+---------+-----+
+---+---------+-----+
我不得不重新分区(到 1 个分区)“myDF”,因为我需要根据日期列对数据进行排序。
它正在将数据写入单个分区,而不是 DF 的“分区”列中的分区或自定义分区程序返回的分区(与分区列中的值相同)。
谢谢
萨提什
根据 2.4.7 docs
,在您的 Dataframe 中使用列“分区”的功能仅适用于版本 3.x,而不适用于更早的版本
但是,使用选项 kafka.partitioner.class
仍然有效。 Spark Structured Streaming 允许您在使用前缀 kafka.
时使用纯 KafkaConsumer 配置,因此这也适用于版本 2.4.4.
以下代码在 Spark 3.0.1 和 Confluent 社区版 5.5.0 上运行良好。在 Spark 2.4.4 上,“分区”列没有任何影响,但我的自定义分区程序 class 适用。
case class KafkaRecord(partition: Int, value: String)
val spark = SparkSession.builder()
.appName("test")
.master("local[*]")
.getOrCreate()
// create DataFrame
import spark.implicits._
val df = Seq((0, "Alice"), (1, "Bob")).toDF("partition", "value").as[KafkaRecord]
df.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "test")
.save()
您随后在控制台消费者中看到的内容:
# partition 0
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic test --partition 0
Alice
和
# partition 1
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic test --partition 1
Bob
使用自定义 Partitioner
时也得到相同的结果
.option("kafka.partitioner.class", "org.test.CustomPartitioner")
我的自定义分区程序定义为
package org.test
class CustomPartitioner extends Partitioner {
override def partition(topic: String, key: Any, keyBytes: Array[Byte], value: Any,valueBytes: Array[Byte],cluster: Cluster): Int = {
if (!valueBytes.isEmpty && valueBytes.map(_.toChar).mkString == "Bob") {
0
} else {
1
}
}
}
我正在尝试将 Spark DF(批处理 DF)写入 Kafka,我需要将数据写入特定分区。
我尝试了以下代码
myDF.write
.format("kafka")
.option("kafka.bootstrap.servers", kafkaProps.getBootstrapServers)
.option("kafka.security.protocol", "SSL")
.option("kafka.truststore.location", kafkaProps.getTrustStoreLocation)
.option("kafka.truststore.password", kafkaProps.getTrustStorePassword)
.option("kafka.keystore.location", kafkaProps.getKeyStoreLocation)
.option("kafka.keystore.password", kafkaProps.getKeyStorePassword)
.option("kafka.partitioner.class", "util.MyCustomPartitioner")
.option("topic",kafkaProps.getTopicName)
.save()
而我正在编写的 DF 的架构是
+---+---------+-----+
|key|partition|value|
+---+---------+-----+
+---+---------+-----+
我不得不重新分区(到 1 个分区)“myDF”,因为我需要根据日期列对数据进行排序。
它正在将数据写入单个分区,而不是 DF 的“分区”列中的分区或自定义分区程序返回的分区(与分区列中的值相同)。
谢谢 萨提什
根据 2.4.7 docs
,在您的 Dataframe 中使用列“分区”的功能仅适用于版本 3.x,而不适用于更早的版本但是,使用选项 kafka.partitioner.class
仍然有效。 Spark Structured Streaming 允许您在使用前缀 kafka.
时使用纯 KafkaConsumer 配置,因此这也适用于版本 2.4.4.
以下代码在 Spark 3.0.1 和 Confluent 社区版 5.5.0 上运行良好。在 Spark 2.4.4 上,“分区”列没有任何影响,但我的自定义分区程序 class 适用。
case class KafkaRecord(partition: Int, value: String)
val spark = SparkSession.builder()
.appName("test")
.master("local[*]")
.getOrCreate()
// create DataFrame
import spark.implicits._
val df = Seq((0, "Alice"), (1, "Bob")).toDF("partition", "value").as[KafkaRecord]
df.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "test")
.save()
您随后在控制台消费者中看到的内容:
# partition 0
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic test --partition 0
Alice
和
# partition 1
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic test --partition 1
Bob
使用自定义 Partitioner
.option("kafka.partitioner.class", "org.test.CustomPartitioner")
我的自定义分区程序定义为
package org.test
class CustomPartitioner extends Partitioner {
override def partition(topic: String, key: Any, keyBytes: Array[Byte], value: Any,valueBytes: Array[Byte],cluster: Cluster): Int = {
if (!valueBytes.isEmpty && valueBytes.map(_.toChar).mkString == "Bob") {
0
} else {
1
}
}
}