如何使用 subscribePattern 订阅新主题?
How to subscribe to a new topic with subscribePattern?
我正在使用 Spark Structured streaming with Kafka 并且主题被订阅为模式:
option("subscribePattern", "topic.*")
// Subscribe to a pattern
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
一旦我开始工作并且列出了一个新主题,比如 topic.new_topic
,该工作不会自动开始收听新主题,它需要重新启动。
有没有办法在不重启作业的情况下自动订阅新模式?
火花:3.0.0
KafkaConsumer 的默认行为是每 5 分钟检查一次是否有新分区要使用。这个配置是通过Consumer config
设置的
metadata.max.age.ms: The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.
根据 Kafka Specific Configuration 上的 Spark + Kafka 集成指南,您可以使用前缀 kafka.
设置此配置,如下所示:
.option("kafka.metadata.max.age.ms", "1000")
通过这个设置,新创建的topic会在创建后1秒被消费
(使用 Spark 3.0.0 和 Kafka Broker 2.5.0 测试)
我正在使用 Spark Structured streaming with Kafka 并且主题被订阅为模式:
option("subscribePattern", "topic.*")
// Subscribe to a pattern
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
一旦我开始工作并且列出了一个新主题,比如 topic.new_topic
,该工作不会自动开始收听新主题,它需要重新启动。
有没有办法在不重启作业的情况下自动订阅新模式?
火花:3.0.0
KafkaConsumer 的默认行为是每 5 分钟检查一次是否有新分区要使用。这个配置是通过Consumer config
设置的metadata.max.age.ms: The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.
根据 Kafka Specific Configuration 上的 Spark + Kafka 集成指南,您可以使用前缀 kafka.
设置此配置,如下所示:
.option("kafka.metadata.max.age.ms", "1000")
通过这个设置,新创建的topic会在创建后1秒被消费
(使用 Spark 3.0.0 和 Kafka Broker 2.5.0 测试)