Kafka Streams 中的 metadataForKey 方法为连接到同一组的多个应用程序实例提供错误信息

Method of metadataForKey in Kafka Streams gives wrong information for multiple instances of application connected to the same group

我正在实施一种机制,该机制通过从商店本地请求或请求远程 Kafka Streams 实例来提供一些元数据信息。

我正在使用 2.4.1 版的 Scala 和 kafka-streams-scala 库

我会尝试用一个简单的小例子来说明我在做什么

  1. 我正在 运行Kafka 集群创建 1 个测试主题和 2 个分区。
  2. 还有我 运行 1 个 Kafka Streams 实例,正如我上面提到的,它实现了从存储请求本地或远程元数据的机制,它保存所有分区信息,直到没有任何其他实例连接到同一组.
  3. 我将一些记录推入测试主题
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "1", "01"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "2", "02"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "3", "03"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "4", "04"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "5", "15"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "6", "16"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "7", "17"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "8", "18"));
  1. 我 运行 连接到同一组的 Kafka Streams 的第二个实例,我看到了重新平衡和分区重新分配过程,据我所知,两个应用程序应该在分区之后共享,例如Kafka Streams 应用程序 1 应该与分区 0 一起工作,Kafka Streams 应用程序 2 应该与分区 1 一起工作,或者在重新平衡和重新分配后反之亦然。

下一步是确保 Kafka Streams 以我在第 4 步中描述的方式工作,我运行正在编写以下代码。

val it: KeyValueIterator[String, String] = streams.store(TEST_REQUEST_STORE, QueryableStoreTypes.keyValueStore[String, String]).all()

while (it.hasNext) {
  val keyValue: KeyValue[String, String] = it.next();
  println(keyValue)
}

非常棒,我看到了我所期望的。 我在本地主机上 运行 的 Kafka Stream 在重新平衡和分区重新分配后保留分区 1。

KeyValue(5, 15)
KeyValue(6, 16)
KeyValue(7, 17)
KeyValue(8, 18)

但是当我运行这段代码时,从我的角度来看,我看到了完全出乎意料的输出。

println(streams.metadataForKey(TEST_REQUEST_STORE, "1", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "2", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "3", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "4", stringSerializer))
println()
println(streams.metadataForKey(TEST_REQUEST_STORE, "5", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "6", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "7", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "8", stringSerializer))
println()
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}

StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}

据我了解,我应该期待这样的事情

StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}

StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}

首先我要注意 metadataForKey 会为您提供一些信息,即使您在商店中没有任何记录,而且似乎托管密钥的信息是随机的.

我意识到问题完全与版本无关,而是与序列化程序有关。

我使用 StringSerializer 从 java 将记录推送到主题中,我尝试使用 Serdes.String.serializer()[ 从 scala 查询元数据=26=] 它给了我与现实不符的随机结果。

我已经创建了另一种将数据推送到主题的方法,使用带有 GenericPrimitiveSerdeString 键序列化程序的 Scala 和 metadataForKey 的相同序列化程序和令我惊讶的是,这次工作如预期。

因此,对于将要使用 metadataForKey 的用户,请注意密钥序列化程序,以便此方法正常工作