重新分区后 Kafka 流不使用 serde

Kafka streams not using serde after repartitioning

我的 Kafka Streams 应用程序正在使用使用以下键值布局的 kafka 主题: String.class -> HistoryEvent.class

打印我当前的主题时可以确认:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flow-event-stream-file-service-test-instance --property print.key=true --property key.separator=" -- " --from-beginning
flow1 --  SUCCESS     #C:\Daten\file-service\in\crypto.p12

"flow1"是String键,--之后的部分是序列化后的值。

我的流程是这样设置的:

    KStream<String, HistoryEvent> eventStream = builder.stream(applicationTopicName, Consumed.with(Serdes.String(),
            historyEventSerde));


    eventStream.selectKey((key, value) -> new HistoryEventKey(key, value.getIdentifier()))
            .groupByKey()
            .reduce((e1, e2) -> e2,
                    Materialized.<HistoryEventKey, HistoryEvent, KeyValueStore<Bytes, byte[]>>as(streamByKeyStoreName)
                            .withKeySerde(new HistoryEventKeySerde()));

据我所知,我告诉它使用 StringHistoryEvent serde 使用主题,因为这是主题中的内容。然后我 'rekey' 它使用一个组合密钥,该密钥应该使用为 HistoryEventKey.class 提供的 serde 在本地存储。据我所知,这将导致使用新密钥创建一个额外的主题(可以在 kafka 容器中的主题列表中看到)。这很好。

现在的问题是应用程序无法启动,即使是在干净的环境中只有主题中的一个文档:

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=flow-event-stream-file-service-test-instance, partition=0, offset=0
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: HistoryEventSerializer) is not compatible to the actual key or value type (key type: HistoryEventKey / value type: HistoryEvent). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

很难从消息中判断出问题的确切位置。它在我的基本主题中说,但这是不可能的,因为那里的密钥不是 HistoryEventKey 类型。由于我在 reduce 中为 HistoryEventKey 提供了一个 serde,因此它也不能与本地商店一起使用。

对我来说唯一有意义的是它与导致重新排列和新主题的selectKey操作有关。但是我无法弄清楚如何为该操作提供 serde。我不想将它设置为默认值,因为它不是默认密钥 serde。

在对执行进行更多调试后,我发现新主题是在 groupByKey 步骤中创建的。您可以提供一个 Grouped 实例,它可以指定用于键和值的 Serde

    eventStream.selectKey((key, value) -> new HistoryEventKey(key, value.getIdentifier()))
            .groupByKey(Grouped.<HistoryEventKey, HistoryEvent>as(null)
                    .withKeySerde(new HistoryEventKeySerde())
                    .withValueSerde(new HistoryEventSerde())
            )
            .reduce((e1, e2) -> e2,
                    Materialized.<HistoryEventKey, HistoryEvent, KeyValueStore<Bytes, byte[]>>as(streamByKeyStoreName)
                            .withKeySerde(new HistoryEventKeySerde()));

我遇到了非常相似的错误消息,但我没有 groupby,而是加入了。我在这里发帖是为了下一个用谷歌搜索的人。

org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic my-processor-KSTREAM-MAP-0000000023-repartition. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.mycorp.mySession). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).

很明显,和原来的问题一样,我不想更改默认的 serdes。

所以在我的例子中,解决方案是在连接中传递一个 Joined 实例,这将允许传递 serdes。请注意,错误消息指向一个 repartition-MAP-...,这有点像转移注意力,因为修复在其他地方。

我是如何修复它的(一个结合的例子)

//...omitted ...

    KStream<String,MySession> mySessions = myStream
    .map((k,v) ->{
      MySession s = new MySession(v);
      k = s.makeKey();
      return new KeyValue<>(k, s);
    });
// ^ the mapping causes the repartition, you can not however specify a serde in there.


// but in the join right below, we can pass a JOINED instance and fix it.
    return enrichedSessions
      .leftJoin(
        myTable,
        (session, info) -> {
          session.infos = info;
          return session; },
        Joined.as("my_enriched_session")
              .keySerde(Serdes.String())
              .valueSerde(MySessionSerde())
      );