为 Kafka Streams 配置 Serdes 的问题
Issue with configuring Serdes for Kafka Streams
我将一个 json 对象放入我的“提交”主题。我想使用 Kafka Streams 消费消息,但是出现错误
@Configuration
@EnableKafka
@EnableKafkaStreams
public class AnalyzerConfiguration {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-streams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, commitSerde().getClass());
return new KafkaStreamsConfiguration(props);
}
@Bean
public Serde<Commit> commitSerde() {
return Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(Commit.class));
}
@Bean
public KStream<String, Commit> kStream(StreamsBuilder builder) {
KStream<String, Commit> stream = builder.stream("commits", Consumed.with(Serdes.String(), commitSerde()));
KTable<String, Long> commitsCount = stream
.mapValues(Commit::getAuthorName)
.selectKey((key, word) -> word)
.groupByKey()
.count(Materialized.as("Counts"));
commitsCount.toStream().to("commits-count", Produced.with(Serdes.String(), Serdes.Long()));
return stream;
}
}
日志说:
线程“test-streams-469f5ee6-d0de-472e-a602-a7b6d11f2e1c-StreamThread-1”中出现异常org.apache.kafka.streams.errors.StreamsException:无法配置值 serde class org.apache.kafka.common.serialization.Serdes$WrapperSerde
原因:org.apache.kafka.common.KafkaException:无法为 org.apache.kafka.common.serialization.Serdes$WrapperSerde
找到 public 无参数构造函数
原因:java.lang.NoSuchMethodException
您的问题是 StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG
的注册问题。首先,这在您的示例中不需要,因为您在创建 KStream 时使用的 Consumed
中指定了值 serde。您可以省略默认的 serde。
如果您将 class 注册为默认 serde,Kafka Streams 将在某个时候通过反射创建该 class 的实例。这会调用 class 的默认(无参数)构造函数。在您的示例中,将从 Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(Commit.class))
使用 org.apache.kafka.common.serialization.Serdes$WrapperSerde class。这个 class 没有这样的构造函数,导致异常。
如果你想为你的 Commit
类型注册一个默认的 serde,你需要将它包装成一个小的 class:
public class CommitSerde extends WrapperSerde<Commit> {
public CommitSerde() {
super(new JsonSerializer<>(), new JsonDeserializer<>(Commit.class));
}
}
这个 class 应该适合在您的示例中使用 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, CommitSerde.class.getName());
注册为默认值 serde。
我将一个 json 对象放入我的“提交”主题。我想使用 Kafka Streams 消费消息,但是出现错误
@Configuration
@EnableKafka
@EnableKafkaStreams
public class AnalyzerConfiguration {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-streams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, commitSerde().getClass());
return new KafkaStreamsConfiguration(props);
}
@Bean
public Serde<Commit> commitSerde() {
return Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(Commit.class));
}
@Bean
public KStream<String, Commit> kStream(StreamsBuilder builder) {
KStream<String, Commit> stream = builder.stream("commits", Consumed.with(Serdes.String(), commitSerde()));
KTable<String, Long> commitsCount = stream
.mapValues(Commit::getAuthorName)
.selectKey((key, word) -> word)
.groupByKey()
.count(Materialized.as("Counts"));
commitsCount.toStream().to("commits-count", Produced.with(Serdes.String(), Serdes.Long()));
return stream;
}
}
日志说:
线程“test-streams-469f5ee6-d0de-472e-a602-a7b6d11f2e1c-StreamThread-1”中出现异常org.apache.kafka.streams.errors.StreamsException:无法配置值 serde class org.apache.kafka.common.serialization.Serdes$WrapperSerde
原因:org.apache.kafka.common.KafkaException:无法为 org.apache.kafka.common.serialization.Serdes$WrapperSerde
找到 public 无参数构造函数原因:java.lang.NoSuchMethodException
您的问题是 StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG
的注册问题。首先,这在您的示例中不需要,因为您在创建 KStream 时使用的 Consumed
中指定了值 serde。您可以省略默认的 serde。
如果您将 class 注册为默认 serde,Kafka Streams 将在某个时候通过反射创建该 class 的实例。这会调用 class 的默认(无参数)构造函数。在您的示例中,将从 Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(Commit.class))
使用 org.apache.kafka.common.serialization.Serdes$WrapperSerde class。这个 class 没有这样的构造函数,导致异常。
如果你想为你的 Commit
类型注册一个默认的 serde,你需要将它包装成一个小的 class:
public class CommitSerde extends WrapperSerde<Commit> {
public CommitSerde() {
super(new JsonSerializer<>(), new JsonDeserializer<>(Commit.class));
}
}
这个 class 应该适合在您的示例中使用 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, CommitSerde.class.getName());
注册为默认值 serde。