Kafka Streams:带有配置参数的 DeserializationExceptionHandler
Kafka Streams: DeserializationExceptionHandler with config parameters
我不熟悉 Java 反射。 MyCustomException class 实现了
DeserializationExceptionHandler 接口和 streamsConfig 中,我知道可以提供 class。但是,有没有办法提供 configs
(在配置方法中)以及 class?能否提供示例代码?
streamsConfig.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, MyCustomException.class);
.
public class MyCustomException implements DeserializationExceptionHandler {
@Override
public void configure(Map<String, ?> configs) {
}
传入的 Map<String, ?> configs
参数应包含您添加到 Properties
中的所有配置,并传递给 KafkaStreams
构造函数。因此,您只需在那里添加您需要的配置,它就会被相应地转发。
我不熟悉 Java 反射。 MyCustomException class 实现了
DeserializationExceptionHandler 接口和 streamsConfig 中,我知道可以提供 class。但是,有没有办法提供 configs
(在配置方法中)以及 class?能否提供示例代码?
streamsConfig.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, MyCustomException.class);
.
public class MyCustomException implements DeserializationExceptionHandler {
@Override
public void configure(Map<String, ?> configs) {
}
传入的 Map<String, ?> configs
参数应包含您添加到 Properties
中的所有配置,并传递给 KafkaStreams
构造函数。因此,您只需在那里添加您需要的配置,它就会被相应地转发。