重新分区后 Kafka 流不使用 serde
Kafka streams not using serde after repartitioning
我的 Kafka Streams 应用程序正在使用使用以下键值布局的 kafka 主题:
String.class -> HistoryEvent.class
打印我当前的主题时可以确认:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flow-event-stream-file-service-test-instance --property print.key=true --property key.separator=" -- " --from-beginning
flow1 -- SUCCESS #C:\Daten\file-service\in\crypto.p12
"flow1"是String
键,--
之后的部分是序列化后的值。
我的流程是这样设置的:
KStream<String, HistoryEvent> eventStream = builder.stream(applicationTopicName, Consumed.with(Serdes.String(),
historyEventSerde));
eventStream.selectKey((key, value) -> new HistoryEventKey(key, value.getIdentifier()))
.groupByKey()
.reduce((e1, e2) -> e2,
Materialized.<HistoryEventKey, HistoryEvent, KeyValueStore<Bytes, byte[]>>as(streamByKeyStoreName)
.withKeySerde(new HistoryEventKeySerde()));
据我所知,我告诉它使用 String
和 HistoryEvent
serde 使用主题,因为这是主题中的内容。然后我 'rekey' 它使用一个组合密钥,该密钥应该使用为 HistoryEventKey.class
提供的 serde 在本地存储。据我所知,这将导致使用新密钥创建一个额外的主题(可以在 kafka 容器中的主题列表中看到)。这很好。
现在的问题是应用程序无法启动,即使是在干净的环境中只有主题中的一个文档:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=flow-event-stream-file-service-test-instance, partition=0, offset=0
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: HistoryEventSerializer) is not compatible to the actual key or value type (key type: HistoryEventKey / value type: HistoryEvent). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
很难从消息中判断出问题的确切位置。它在我的基本主题中说,但这是不可能的,因为那里的密钥不是 HistoryEventKey
类型。由于我在 reduce
中为 HistoryEventKey
提供了一个 serde,因此它也不能与本地商店一起使用。
对我来说唯一有意义的是它与导致重新排列和新主题的selectKey
操作有关。但是我无法弄清楚如何为该操作提供 serde。我不想将它设置为默认值,因为它不是默认密钥 serde。
在对执行进行更多调试后,我发现新主题是在 groupByKey
步骤中创建的。您可以提供一个 Grouped
实例,它可以指定用于键和值的 Serde
:
eventStream.selectKey((key, value) -> new HistoryEventKey(key, value.getIdentifier()))
.groupByKey(Grouped.<HistoryEventKey, HistoryEvent>as(null)
.withKeySerde(new HistoryEventKeySerde())
.withValueSerde(new HistoryEventSerde())
)
.reduce((e1, e2) -> e2,
Materialized.<HistoryEventKey, HistoryEvent, KeyValueStore<Bytes, byte[]>>as(streamByKeyStoreName)
.withKeySerde(new HistoryEventKeySerde()));
我遇到了非常相似的错误消息,但我没有 groupby,而是加入了。我在这里发帖是为了下一个用谷歌搜索的人。
org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic my-processor-KSTREAM-MAP-0000000023-repartition. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.mycorp.mySession). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
很明显,和原来的问题一样,我不想更改默认的 serdes。
所以在我的例子中,解决方案是在连接中传递一个 Joined 实例,这将允许传递 serdes。请注意,错误消息指向一个 repartition-MAP-...
,这有点像转移注意力,因为修复在其他地方。
我是如何修复它的(一个结合的例子)
//...omitted ...
KStream<String,MySession> mySessions = myStream
.map((k,v) ->{
MySession s = new MySession(v);
k = s.makeKey();
return new KeyValue<>(k, s);
});
// ^ the mapping causes the repartition, you can not however specify a serde in there.
// but in the join right below, we can pass a JOINED instance and fix it.
return enrichedSessions
.leftJoin(
myTable,
(session, info) -> {
session.infos = info;
return session; },
Joined.as("my_enriched_session")
.keySerde(Serdes.String())
.valueSerde(MySessionSerde())
);
我的 Kafka Streams 应用程序正在使用使用以下键值布局的 kafka 主题:
String.class -> HistoryEvent.class
打印我当前的主题时可以确认:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flow-event-stream-file-service-test-instance --property print.key=true --property key.separator=" -- " --from-beginning
flow1 -- SUCCESS #C:\Daten\file-service\in\crypto.p12
"flow1"是String
键,--
之后的部分是序列化后的值。
我的流程是这样设置的:
KStream<String, HistoryEvent> eventStream = builder.stream(applicationTopicName, Consumed.with(Serdes.String(),
historyEventSerde));
eventStream.selectKey((key, value) -> new HistoryEventKey(key, value.getIdentifier()))
.groupByKey()
.reduce((e1, e2) -> e2,
Materialized.<HistoryEventKey, HistoryEvent, KeyValueStore<Bytes, byte[]>>as(streamByKeyStoreName)
.withKeySerde(new HistoryEventKeySerde()));
据我所知,我告诉它使用 String
和 HistoryEvent
serde 使用主题,因为这是主题中的内容。然后我 'rekey' 它使用一个组合密钥,该密钥应该使用为 HistoryEventKey.class
提供的 serde 在本地存储。据我所知,这将导致使用新密钥创建一个额外的主题(可以在 kafka 容器中的主题列表中看到)。这很好。
现在的问题是应用程序无法启动,即使是在干净的环境中只有主题中的一个文档:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=flow-event-stream-file-service-test-instance, partition=0, offset=0
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: HistoryEventSerializer) is not compatible to the actual key or value type (key type: HistoryEventKey / value type: HistoryEvent). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
很难从消息中判断出问题的确切位置。它在我的基本主题中说,但这是不可能的,因为那里的密钥不是 HistoryEventKey
类型。由于我在 reduce
中为 HistoryEventKey
提供了一个 serde,因此它也不能与本地商店一起使用。
对我来说唯一有意义的是它与导致重新排列和新主题的selectKey
操作有关。但是我无法弄清楚如何为该操作提供 serde。我不想将它设置为默认值,因为它不是默认密钥 serde。
在对执行进行更多调试后,我发现新主题是在 groupByKey
步骤中创建的。您可以提供一个 Grouped
实例,它可以指定用于键和值的 Serde
:
eventStream.selectKey((key, value) -> new HistoryEventKey(key, value.getIdentifier()))
.groupByKey(Grouped.<HistoryEventKey, HistoryEvent>as(null)
.withKeySerde(new HistoryEventKeySerde())
.withValueSerde(new HistoryEventSerde())
)
.reduce((e1, e2) -> e2,
Materialized.<HistoryEventKey, HistoryEvent, KeyValueStore<Bytes, byte[]>>as(streamByKeyStoreName)
.withKeySerde(new HistoryEventKeySerde()));
我遇到了非常相似的错误消息,但我没有 groupby,而是加入了。我在这里发帖是为了下一个用谷歌搜索的人。
org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic my-processor-KSTREAM-MAP-0000000023-repartition. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.mycorp.mySession). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
很明显,和原来的问题一样,我不想更改默认的 serdes。
所以在我的例子中,解决方案是在连接中传递一个 Joined 实例,这将允许传递 serdes。请注意,错误消息指向一个 repartition-MAP-...
,这有点像转移注意力,因为修复在其他地方。
我是如何修复它的(一个结合的例子)
//...omitted ...
KStream<String,MySession> mySessions = myStream
.map((k,v) ->{
MySession s = new MySession(v);
k = s.makeKey();
return new KeyValue<>(k, s);
});
// ^ the mapping causes the repartition, you can not however specify a serde in there.
// but in the join right below, we can pass a JOINED instance and fix it.
return enrichedSessions
.leftJoin(
myTable,
(session, info) -> {
session.infos = info;
return session; },
Joined.as("my_enriched_session")
.keySerde(Serdes.String())
.valueSerde(MySessionSerde())
);