为什么 Kafka KTable 缺少条目?

Why Kafka KTable is missing entries?

我有一个使用 Kafka Streams 中的 KTable 的实例 java 应用程序。直到最近,当某些消息突然消失时,我才可以使用 KTable 检索所有数据。那里应该有 ~33k 条带有唯一键的消息。

当我想按键检索消息时,我没有收到一些消息。我使用 ReadOnlyKeyValueStore 来检索消息:

final ReadOnlyKeyValueStore<GenericRecord, GenericRecord> store = ((KafkaStreams)streams).store(storeName, QueryableStoreTypes.keyValueStore());
store.get(key);

这些是我为 KafkaStreams 设置的配置设置。

final Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, serverId);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

卡夫卡: 0.10.2.0-cp1
融合:3.2.0

调查让我有了一些非常令人担忧的见解。使用 REST 代理我手动读取分区并发现一些偏移 return 错误。

要求: /topics/{topic}/partitions/{partition}/messages?offset={offset}

{
    "error_code": 50002,
    "message": "Kafka error: Fetch response contains an error code: 1"
}

没有客户端,java 和命令行都没有,但是 return 没有任何错误。他们只是跳过导致 KTables 中丢失数据的 错误 丢失消息。一切都很好,似乎有些消息不知何故损坏了。

我有两个代理,所有主题的复制因子都是 2,并且已完全复制。两个经纪人分别 return 相同。重启经纪人没有区别。

来自 default Kafka Broker config key cleanup.policy is set to delete. Set it to compact to keep the latest message for each key. See compaction

删除旧消息不会更改最小偏移量,因此尝试检索低于它的消息会导致错误。错误非常模糊。 Kafka Streams 客户端将从最小偏移量开始读取消息,因此没有错误。唯一可见的影响是 KTables 中缺少数据。

虽然应用程序 运行 由于 caches 所有数据可能仍然可用,即使从 Kafka 本身删除消息后也是如此。它们会在清理后消失。