KafkaStreams,注册 Avro 模式时出错

KafkaStreams, Error Registering Avro Schema

免责声明:我对 KafkaStreams 的体验非常有限。 我不太明白为什么我会收到 org.apache.kafka.common.errors.SerializationException: Error registering Avro schema:Schema being registered is incompatible with an earlier schema; 错误,而我所做的只是将主题流式传输到 KTable 中,以便稍后可以在该商店上使用交互式查询。

这是 SerDes 配置。

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);

Streams 代码:请注意,此时我不希望对 Stream 进行任何过滤或分组,我只希望数据可用于将来通过 Store 查询。

final KStream<String,GenericRecord> stream = builder.stream("my-topic");
stream.toTable(Materialized.as("my-state-store"));

每当我 streams.start() 我在序列化时不断收到这些异常。 我有一个模式,所以我使用了 SpecificAvroSerdes,但问题是一样的。 我想我对为什么我的 KTable 试图向 Confluent 注册一个新模式缺少一些基本的理解。

编辑 1: 我现在明白模式寄存器在这里扮演的角色。将 KStream 与 GenericAvroSerde 结合使用,我可以使用主题中的数据,但仍然无法在 KTable 中实现它。 我现在的问题是:

  1. 为什么我总是在同一个分区和偏移量中收到上述异常,即使我没有调用 streams.cleanUp()。为什么它不继续(提交)。
  2. 这个异常似乎是不可恢复的。所有 Streams 线程都死了,导致应用程序宕机。有没有办法规避这个?注意:我已经在使用 LogAndContinue 异常处理程序进行生产和反序列化。

编辑 2:

我能够克服那个异常。我的 StateStore 包含具有不兼容架构的先前条目。在我清除主题并更改 ApplicationId 后,它开始工作了。

虽然这仍然不能否定捕获 Schema being registered is incompatible with an earlier schema; 异常的必要性。这会使 Streams 应用程序停止。我曾尝试使用 streams.setUncaughtExceptionHandler 来记录错误,但这并不能阻止 Streams 线程死亡,之后我什至无法启动它们。肯定有办法解决这个问题?

你描述的异常是致命的,你不能阻止线程死掉atm,因为线程无法取得任何进展而放弃。只有在您解决问题后,您才能重新启动应用程序(正如您观察到的那样)。

顺便说一句:你为​​什么不简单地使用 builder.table() 来阅读主题?