Kafka Streams:使用相同的“application.id”从多个主题中消费
Kafka Streams: use the same `application.id` to consume from multiple topics
我有一个应用程序需要收听多个不同的主题;每个主题对于如何处理消息都有单独的逻辑。我曾想过为每个 KafkaStreams 实例使用相同的 kafka 属性,但我收到如下所示的错误。
错误
java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscription pattern is my-other-topic
代码 (kotlin)
class KafkaSetup() {
companion object {
private val LOG = LoggerFactory.getLogger(this::class.java)
}
fun getProperties(): Properties {
val properties = Properties()
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app")
return properties
}
private fun listenOnMyTopic() {
val kStreamBuilder = KStreamBuilder()
val kStream: KStream<String, String> = kStreamBuilder.stream("my-topic")
kStream.foreach { key, value -> LOG.info("do stuff") }
val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
kafkaStreams.start()
}
private fun listenOnMyOtherTopic() {
val kStreamBuilder = KStreamBuilder()
val kStream: KStream<String, String> = kStreamBuilder.stream("my-other-topic")
kStream.foreach { key, value -> LOG.info("do other stuff") }
val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
kafkaStreams.start()
}
}
我发现这个 reference that suggest that you can not use application.id
for multiple topics, however I am finding it hard to find reference documentation to support that. The documentation application.id
状态:
An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.
问题
- 这个错误是什么意思,是什么原因造成的。
- 鉴于您可以让应用程序的多个实例 运行 从多个主题分区使用相同的 ID,"Must be unique within the Kafka cluster" 是什么意思?
- 您可以使用相同的 Kafka 流
application.id
启动两个 KafkaStreams
列出不同的主题吗?如果是,怎么做?
详情: kafka 0.11.0.2
Kafka Streams 通过分区而不是主题进行扩展。因此,如果您使用相同的 application.id
启动多个应用程序,它们在订阅的输入主题和处理逻辑方面必须相同。该应用程序使用 application.id
作为 group.id
形成一个消费者组,因此输入主题的不同分区被分配给不同的实例。
如果您有与相同逻辑的不同主题,您可以订阅全部 主题一次(在您开始的每个实例中)。缩放仍然基于分区。 (它基本上是您输入主题的 "merge"。)
如果您想通过主题进行扩展 and/or 有不同的处理逻辑,您必须对不同的 Kafka Streams 应用程序使用不同的 application.id
。
我有一个应用程序需要收听多个不同的主题;每个主题对于如何处理消息都有单独的逻辑。我曾想过为每个 KafkaStreams 实例使用相同的 kafka 属性,但我收到如下所示的错误。
错误
java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscription pattern is my-other-topic
代码 (kotlin)
class KafkaSetup() {
companion object {
private val LOG = LoggerFactory.getLogger(this::class.java)
}
fun getProperties(): Properties {
val properties = Properties()
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app")
return properties
}
private fun listenOnMyTopic() {
val kStreamBuilder = KStreamBuilder()
val kStream: KStream<String, String> = kStreamBuilder.stream("my-topic")
kStream.foreach { key, value -> LOG.info("do stuff") }
val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
kafkaStreams.start()
}
private fun listenOnMyOtherTopic() {
val kStreamBuilder = KStreamBuilder()
val kStream: KStream<String, String> = kStreamBuilder.stream("my-other-topic")
kStream.foreach { key, value -> LOG.info("do other stuff") }
val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
kafkaStreams.start()
}
}
我发现这个 reference that suggest that you can not use application.id
for multiple topics, however I am finding it hard to find reference documentation to support that. The documentation application.id
状态:
An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.
问题
- 这个错误是什么意思,是什么原因造成的。
- 鉴于您可以让应用程序的多个实例 运行 从多个主题分区使用相同的 ID,"Must be unique within the Kafka cluster" 是什么意思?
- 您可以使用相同的 Kafka 流
application.id
启动两个KafkaStreams
列出不同的主题吗?如果是,怎么做?
详情: kafka 0.11.0.2
Kafka Streams 通过分区而不是主题进行扩展。因此,如果您使用相同的 application.id
启动多个应用程序,它们在订阅的输入主题和处理逻辑方面必须相同。该应用程序使用 application.id
作为 group.id
形成一个消费者组,因此输入主题的不同分区被分配给不同的实例。
如果您有与相同逻辑的不同主题,您可以订阅全部 主题一次(在您开始的每个实例中)。缩放仍然基于分区。 (它基本上是您输入主题的 "merge"。)
如果您想通过主题进行扩展 and/or 有不同的处理逻辑,您必须对不同的 Kafka Streams 应用程序使用不同的 application.id
。