如何使用 subscribePattern 订阅新主题?

How to subscribe to a new topic with subscribePattern?

我正在使用 Spark Structured streaming with Kafka 并且主题被订阅为模式:

option("subscribePattern", "topic.*")

// Subscribe to a pattern
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load()

一旦我开始工作并且列出了一个新主题,比如 topic.new_topic,该工作不会自动开始收听新主题,它需要重新启动。

有没有办法在不重启作业的情况下自动订阅新模式?

火花:3.0.0

KafkaConsumer 的默认行为是每 5 分钟检查一次是否有新分区要使用。这个配置是通过Consumer config

设置的

metadata.max.age.ms: The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.

根据 Kafka Specific Configuration 上的 Spark + Kafka 集成指南,您可以使用前缀 kafka. 设置此配置,如下所示:

.option("kafka.metadata.max.age.ms", "1000")

通过这个设置,新创建的topic会在创建后1秒被消费

(使用 Spark 3.0.0 和 Kafka Broker 2.5.0 测试)