将 Kafka Streams 与依赖于 Headers 中的模式引用的 Serdes 结合使用

Using Kafka Streams with Serdes that rely on schema references in the Headers

我正在尝试使用 Kafka Streams 对 CDC 数据执行 KTable-KTable 外键连接。我将要读取的数据是 Avro 格式,但是它的序列化方式与其他行业 serializer/deserializers(例如 Confluent 模式注册表)不兼容,因为模式标识符存储在 headers.

当我设置我的 KTables 的 Serdes 时,我的 Kafka Streams 应用程序最初是 运行,但最终失败了,因为它在内部调用了 byte[] serialize(String topic, T data); 的 Serializer 方法,而不是 [=36= 的方法](即包装序列化器 ValueAndTimestampSerializer 中的 byte[] serialize(String topic, Headers headers, T data)。我正在使用的 Serdes 无法处理此问题并引发异常。

第一个问题是,有没有人知道一种方法可以让 Kafka Streams 在内部调用具有正确方法签名的方法?

我正在探索解决这个问题的方法,包括编写新的 Serdes,re-serialize 在消息本身中使用架构标识符。这可能涉及将数据重新复制到新主题或使用拦截器。

不过,我知道 ValueTransformer 可以访问 ProcessorContext 中的 headers,我想知道是否有使用 transformValues() 的更快方法。我的想法是首先将值读取为 byte[],然后在转换器中将该值反序列化为我的 Avro class(请参见下面的示例)。但是,当我这样做时,出现异常。

StreamsBuilder builder = new StreamsBuilder();
 final KTable<Long, MySpecificClass> myTable = builder.table(
      "my-topic",
       Consumed.with(Serdes.Long(), Serdes.ByteArray())
    )
    .transformValues(MyDeserializerTransformer::new);

 ...

 KTable<Long, JoinResult> joinResultTable = myTable.join(rightTable, MySpecificClass::getJoinKey, myValueJoiner);

 joinResultTable.toStream()...
public class MyDeserializerTransformer implements
    ValueTransformerWithKey<Long, byte[], MySpecificClass> {
  MyAvroDeserializer deserializer;
  ProcessorContext context;

  @Override
  public void init(ProcessorContext context) {
    deserializer = new MyAvroDeserializer();
    this.context = context;
  }

  @Override
  public MySpecificClass transform(Long key, byte[] value) {
    return deserializer.deserialize(context.topic(), context.headers(), value);
  }

  @Override
  public void close() {

  }
}

当我 运行 这样做时,我收到一个 ClassCastException。我该如何解决这个问题或找到解决方法?我需要使用辅助状态存储吗?

"class": "org.apache.kafka.streams.errors.StreamsException",
    "msg": "ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: java.lang.Long, and value: org.apache.kafka.streams.kstream.internals.Change.\nNote that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.",
    "stack": [
      "org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185)",
      "org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)",
      "org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)",
      "org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)",
      "org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)",
      "org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:117)",
      "org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:87)",
      "org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)",
      "org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:181)",
      "org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)",
      "org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)",
...
"cause": {
 "class": "java.lang.ClassCastException",
      "msg": "class com.package.MySpecificClass cannot be cast to class [B (com.package.MySpecificClass is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')",
      "stack": [
        "org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)",
        "org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:102)",
        "org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:72)",
        "org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)",
        "org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:181)",
        "org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)",
        "org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)",

我能够通过首先将输入主题读取为 KStream 并将其转换为具有不同 Serde 的 KTable 作为第二步来解决这个问题,似乎 State Stores 遇到了不调用的问题 serializer/deserializer 方法签名与 headers.