Spring Cloud Stream - 如果条件满足如何在 SpecificRecord 中读取,否则如何在 GenericRecord 中读取

Spring Cloud Stream - How to read in SpecificRecord if condition met and in GenericRecord otherwise

当从 Kafka 输入主题读取具有其唯一 Avro 架构的消息类型 A 时,我有服务 A 执行操作 A,而当从同一主题读取消息类型 B 时,服务 B 执行操作 B。

我不希望服务 A 知道 B 的架构,反之亦然。如果我有两个单独的主题,我会轻松地将 spring.kafka.properties.specific.avro.reader 设置为 true 并使用特定记录,但由于我不希望模式 B 存在于服务 A 中,反之亦然(解耦原因),我理想中寻找的是如下内容。

内部服务A:

@StreamListener(value = Processor.Input, condition = "new String(headers['message_type'])=='A'")
public void consumeInSpecificRecord(TypeACompiledAvroClass a){
// Some logic
}

@StreamListener(value = Processor.Input, condition = "new String(headers['message_type'])=='B'")
public void consumeInGenericRecord(GenericRecord b){
// Log and ignore (leave it for service B to process)
}

这是否可以实现,因为将 specific.avro.reader 标志设置为 true 会导致消息 B 的反序列化错误并将其设置为 false 将迫使我使用通用记录,即使对于我正在尝试的第一个流侦听器也是如此避免。

如果不可能,这里可以推荐什么其他替代解决方案?

Spring cloud stream version: 2.2.0.RELEASE
Spring Kafka: 2.2.5.RELEASE
Confluent version for the serializer: 5.2.1

根据 Gary 的评论,我可以想出一个自定义反序列化器,它可以根据 header 中的内容在通用记录和特定记录之间切换。这是 class A.

的代码
public class GenericSpecificFlexibleDeserializer implements ExtendedDeserializer<Object> {
  private static final String SPECIFIC_AVRO_READER = "specific.avro.reader";
  private Map<String, ?> config;
  private KafkaAvroDeserializer specificKafkaAvroDeserializer;
  private KafkaAvroDeserializer genericKafkaAvroDeserializer;

  @Override
  public Object deserialize(String topic, Headers headers, byte[] data) {
    List<String> entityTypeHeader = new LinkedList<>();
    if (headers != null) {
      if (new String(headers.lastHeader("message_type").value()).equals("A")){
        return specificKafkaAvroDeserializer.deserialize(topic, data);
      }
    }
      return genericKafkaAvroDeserializer.deserialize(topic, data);
  }

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    this.config = configs;
    HashMap<String, Object> specificConfig = new HashMap<>(config);
    specificConfig.put(SPECIFIC_AVRO_READER, true);

    HashMap<String, Object> genericConfig = new HashMap<>(config);
    genericConfig.put(SPECIFIC_AVRO_READER, false);

    specificKafkaAvroDeserializer = new KafkaAvroDeserializer();
    specificKafkaAvroDeserializer.configure(specificConfig, isKey);
    genericKafkaAvroDeserializer = new KafkaAvroDeserializer();
    genericKafkaAvroDeserializer.configure(genericConfig, isKey);
  }

  @Override
  public Object deserialize(String topic, byte[] data) {
    return deserialize(topic, null, data);
  }

  @Override
  public void close() {
    genericKafkaAvroDeserializer.close();
    specificKafkaAvroDeserializer.close();
  }
}

当然,您的应用配置中的 spring.kafka.cloud.stream.kafka.binder.configuration.value.deserializer 必须设置为 GenericSpecificFlexibleDeserializer