KafkaStreams - 不一致的组协议异常
KafkaStreams - InconsistentGroupProtocolException
我有一个 Kafka Streams 应用程序,它使用 Kafka Streams DSL 连接到我们的 Kafka 集群,如下所示:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, byte[]> stream = builder.stream(myTopic);
// do work
kStreams = new KafkaStreams(builder, config);
kStreams.start();
我的代码库的另一部分直接使用消费者客户端与我们的集群建立连接。
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config, keyDeserializer, valueDeserializer);
consumer.subscribe(Collections.singletonList(sourceTopic));
consumer.poll(500L);
// etc
consumer.close();
我这样做的原因是在有条件地启动应用程序的其他部分(包括 Kafka Streams 拓扑)之前收集有关消费者组的元数据。可能还有其他方法可以做到这一点(例如通过各种钩子或其他方式),但我更好奇为什么这些方法的混合有时会(间歇性地)导致抛出 InconsistentGroupProtocolException
。
有人可以解释一下为什么会抛出这个吗?我很难从源代码本身确定到底发生了什么,但我猜想由 Kafka Streams 构建的底层消费者正在指定与 KafkaConsumer
客户端不同的分区协议。无论如何,我们将不胜感激对理解此异常的任何帮助
你自己填答案。 Kafka Streams 使用自定义分区分配器,并且 Kafka Streams 客户端只能与其他 Kafka Streams 客户端一起使用。如果您使用与 Kafka Streams 应用具有相同组 ID 的 KafkaConsumer
,它将无法阻止 KafkaConsumer
加入 Kafka Streams 消费者组。显然,KafkaConsumer
不能 "play" 使用 Kafka Streams。
我有一个 Kafka Streams 应用程序,它使用 Kafka Streams DSL 连接到我们的 Kafka 集群,如下所示:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, byte[]> stream = builder.stream(myTopic);
// do work
kStreams = new KafkaStreams(builder, config);
kStreams.start();
我的代码库的另一部分直接使用消费者客户端与我们的集群建立连接。
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config, keyDeserializer, valueDeserializer);
consumer.subscribe(Collections.singletonList(sourceTopic));
consumer.poll(500L);
// etc
consumer.close();
我这样做的原因是在有条件地启动应用程序的其他部分(包括 Kafka Streams 拓扑)之前收集有关消费者组的元数据。可能还有其他方法可以做到这一点(例如通过各种钩子或其他方式),但我更好奇为什么这些方法的混合有时会(间歇性地)导致抛出 InconsistentGroupProtocolException
。
有人可以解释一下为什么会抛出这个吗?我很难从源代码本身确定到底发生了什么,但我猜想由 Kafka Streams 构建的底层消费者正在指定与 KafkaConsumer
客户端不同的分区协议。无论如何,我们将不胜感激对理解此异常的任何帮助
你自己填答案。 Kafka Streams 使用自定义分区分配器,并且 Kafka Streams 客户端只能与其他 Kafka Streams 客户端一起使用。如果您使用与 Kafka Streams 应用具有相同组 ID 的 KafkaConsumer
,它将无法阻止 KafkaConsumer
加入 Kafka Streams 消费者组。显然,KafkaConsumer
不能 "play" 使用 Kafka Streams。