如何从 Spark 结构化流中的特定 Kafka 分区读取
How to read from specific Kafka partition in Spark structured streaming
我的 Kafka 主题有三个分区,我想知道我是否可以只从三个分区中的一个分区读取数据。我的消费者是 spark 结构化流应用程序。
下面是我在 spark 中现有的 kafka 设置。
val inputDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topic)
.option("startingOffsets", "latest")
.load()
这是从特定分区读取数据的方法。
val inputDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("assign", """{"topic":[0]}""")
.option("startingOffsets", "latest")
.load()
PS: 从多个分区而不是 1 读取 --> """{"topic":[0,1,2..n]}""""
同理,如何写入特定分区。我试过了,但没用。
someDF
.selectExpr("key", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaServers)
.option("topic", "someTopic")
.option("partition", partIdx)
.start()
我的 Kafka 主题有三个分区,我想知道我是否可以只从三个分区中的一个分区读取数据。我的消费者是 spark 结构化流应用程序。
下面是我在 spark 中现有的 kafka 设置。
val inputDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topic)
.option("startingOffsets", "latest")
.load()
这是从特定分区读取数据的方法。
val inputDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("assign", """{"topic":[0]}""")
.option("startingOffsets", "latest")
.load()
PS: 从多个分区而不是 1 读取 --> """{"topic":[0,1,2..n]}""""
同理,如何写入特定分区。我试过了,但没用。
someDF
.selectExpr("key", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaServers)
.option("topic", "someTopic")
.option("partition", partIdx)
.start()