KafkaStreams,注册 Avro 模式时出错
KafkaStreams, Error Registering Avro Schema
免责声明:我对 KafkaStreams 的体验非常有限。
我不太明白为什么我会收到 org.apache.kafka.common.errors.SerializationException: Error registering Avro schema:
和 Schema being registered is incompatible with an earlier schema;
错误,而我所做的只是将主题流式传输到 KTable 中,以便稍后可以在该商店上使用交互式查询。
这是 SerDes 配置。
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
Streams 代码:请注意,此时我不希望对 Stream 进行任何过滤或分组,我只希望数据可用于将来通过 Store 查询。
final KStream<String,GenericRecord> stream = builder.stream("my-topic");
stream.toTable(Materialized.as("my-state-store"));
每当我 streams.start()
我在序列化时不断收到这些异常。
我有一个模式,所以我使用了 SpecificAvroSerdes,但问题是一样的。
我想我对为什么我的 KTable 试图向 Confluent 注册一个新模式缺少一些基本的理解。
编辑 1:
我现在明白模式寄存器在这里扮演的角色。将 KStream 与 GenericAvroSerde 结合使用,我可以使用主题中的数据,但仍然无法在 KTable 中实现它。
我现在的问题是:
- 为什么我总是在同一个分区和偏移量中收到上述异常,即使我没有调用
streams.cleanUp()
。为什么它不继续(提交)。
- 这个异常似乎是不可恢复的。所有 Streams 线程都死了,导致应用程序宕机。有没有办法规避这个?注意:我已经在使用 LogAndContinue 异常处理程序进行生产和反序列化。
编辑 2:
我能够克服那个异常。我的 StateStore 包含具有不兼容架构的先前条目。在我清除主题并更改 ApplicationId 后,它开始工作了。
虽然这仍然不能否定捕获 Schema being registered is incompatible with an earlier schema;
异常的必要性。这会使 Streams 应用程序停止。我曾尝试使用 streams.setUncaughtExceptionHandler
来记录错误,但这并不能阻止 Streams 线程死亡,之后我什至无法启动它们。肯定有办法解决这个问题?
你描述的异常是致命的,你不能阻止线程死掉atm,因为线程无法取得任何进展而放弃。只有在您解决问题后,您才能重新启动应用程序(正如您观察到的那样)。
顺便说一句:你为什么不简单地使用 builder.table()
来阅读主题?
免责声明:我对 KafkaStreams 的体验非常有限。
我不太明白为什么我会收到 org.apache.kafka.common.errors.SerializationException: Error registering Avro schema:
和 Schema being registered is incompatible with an earlier schema;
错误,而我所做的只是将主题流式传输到 KTable 中,以便稍后可以在该商店上使用交互式查询。
这是 SerDes 配置。
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
Streams 代码:请注意,此时我不希望对 Stream 进行任何过滤或分组,我只希望数据可用于将来通过 Store 查询。
final KStream<String,GenericRecord> stream = builder.stream("my-topic");
stream.toTable(Materialized.as("my-state-store"));
每当我 streams.start()
我在序列化时不断收到这些异常。
我有一个模式,所以我使用了 SpecificAvroSerdes,但问题是一样的。
我想我对为什么我的 KTable 试图向 Confluent 注册一个新模式缺少一些基本的理解。
编辑 1: 我现在明白模式寄存器在这里扮演的角色。将 KStream 与 GenericAvroSerde 结合使用,我可以使用主题中的数据,但仍然无法在 KTable 中实现它。 我现在的问题是:
- 为什么我总是在同一个分区和偏移量中收到上述异常,即使我没有调用
streams.cleanUp()
。为什么它不继续(提交)。 - 这个异常似乎是不可恢复的。所有 Streams 线程都死了,导致应用程序宕机。有没有办法规避这个?注意:我已经在使用 LogAndContinue 异常处理程序进行生产和反序列化。
编辑 2:
我能够克服那个异常。我的 StateStore 包含具有不兼容架构的先前条目。在我清除主题并更改 ApplicationId 后,它开始工作了。
虽然这仍然不能否定捕获 Schema being registered is incompatible with an earlier schema;
异常的必要性。这会使 Streams 应用程序停止。我曾尝试使用 streams.setUncaughtExceptionHandler
来记录错误,但这并不能阻止 Streams 线程死亡,之后我什至无法启动它们。肯定有办法解决这个问题?
你描述的异常是致命的,你不能阻止线程死掉atm,因为线程无法取得任何进展而放弃。只有在您解决问题后,您才能重新启动应用程序(正如您观察到的那样)。
顺便说一句:你为什么不简单地使用 builder.table()
来阅读主题?