Spring Cloud Stream - 如果条件满足如何在 SpecificRecord 中读取,否则如何在 GenericRecord 中读取
Spring Cloud Stream - How to read in SpecificRecord if condition met and in GenericRecord otherwise
当从 Kafka 输入主题读取具有其唯一 Avro 架构的消息类型 A 时,我有服务 A 执行操作 A,而当从同一主题读取消息类型 B 时,服务 B 执行操作 B。
我不希望服务 A 知道 B 的架构,反之亦然。如果我有两个单独的主题,我会轻松地将 spring.kafka.properties.specific.avro.reader
设置为 true
并使用特定记录,但由于我不希望模式 B 存在于服务 A 中,反之亦然(解耦原因),我理想中寻找的是如下内容。
内部服务A:
@StreamListener(value = Processor.Input, condition = "new String(headers['message_type'])=='A'")
public void consumeInSpecificRecord(TypeACompiledAvroClass a){
// Some logic
}
@StreamListener(value = Processor.Input, condition = "new String(headers['message_type'])=='B'")
public void consumeInGenericRecord(GenericRecord b){
// Log and ignore (leave it for service B to process)
}
这是否可以实现,因为将 specific.avro.reader
标志设置为 true 会导致消息 B 的反序列化错误并将其设置为 false 将迫使我使用通用记录,即使对于我正在尝试的第一个流侦听器也是如此避免。
如果不可能,这里可以推荐什么其他替代解决方案?
Spring cloud stream version: 2.2.0.RELEASE
Spring Kafka: 2.2.5.RELEASE
Confluent version for the serializer: 5.2.1
根据 Gary 的评论,我可以想出一个自定义反序列化器,它可以根据 header 中的内容在通用记录和特定记录之间切换。这是 class A
.
的代码
public class GenericSpecificFlexibleDeserializer implements ExtendedDeserializer<Object> {
private static final String SPECIFIC_AVRO_READER = "specific.avro.reader";
private Map<String, ?> config;
private KafkaAvroDeserializer specificKafkaAvroDeserializer;
private KafkaAvroDeserializer genericKafkaAvroDeserializer;
@Override
public Object deserialize(String topic, Headers headers, byte[] data) {
List<String> entityTypeHeader = new LinkedList<>();
if (headers != null) {
if (new String(headers.lastHeader("message_type").value()).equals("A")){
return specificKafkaAvroDeserializer.deserialize(topic, data);
}
}
return genericKafkaAvroDeserializer.deserialize(topic, data);
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.config = configs;
HashMap<String, Object> specificConfig = new HashMap<>(config);
specificConfig.put(SPECIFIC_AVRO_READER, true);
HashMap<String, Object> genericConfig = new HashMap<>(config);
genericConfig.put(SPECIFIC_AVRO_READER, false);
specificKafkaAvroDeserializer = new KafkaAvroDeserializer();
specificKafkaAvroDeserializer.configure(specificConfig, isKey);
genericKafkaAvroDeserializer = new KafkaAvroDeserializer();
genericKafkaAvroDeserializer.configure(genericConfig, isKey);
}
@Override
public Object deserialize(String topic, byte[] data) {
return deserialize(topic, null, data);
}
@Override
public void close() {
genericKafkaAvroDeserializer.close();
specificKafkaAvroDeserializer.close();
}
}
当然,您的应用配置中的 spring.kafka.cloud.stream.kafka.binder.configuration.value.deserializer
必须设置为 GenericSpecificFlexibleDeserializer
。
当从 Kafka 输入主题读取具有其唯一 Avro 架构的消息类型 A 时,我有服务 A 执行操作 A,而当从同一主题读取消息类型 B 时,服务 B 执行操作 B。
我不希望服务 A 知道 B 的架构,反之亦然。如果我有两个单独的主题,我会轻松地将 spring.kafka.properties.specific.avro.reader
设置为 true
并使用特定记录,但由于我不希望模式 B 存在于服务 A 中,反之亦然(解耦原因),我理想中寻找的是如下内容。
内部服务A:
@StreamListener(value = Processor.Input, condition = "new String(headers['message_type'])=='A'")
public void consumeInSpecificRecord(TypeACompiledAvroClass a){
// Some logic
}
@StreamListener(value = Processor.Input, condition = "new String(headers['message_type'])=='B'")
public void consumeInGenericRecord(GenericRecord b){
// Log and ignore (leave it for service B to process)
}
这是否可以实现,因为将 specific.avro.reader
标志设置为 true 会导致消息 B 的反序列化错误并将其设置为 false 将迫使我使用通用记录,即使对于我正在尝试的第一个流侦听器也是如此避免。
如果不可能,这里可以推荐什么其他替代解决方案?
Spring cloud stream version: 2.2.0.RELEASE
Spring Kafka: 2.2.5.RELEASE
Confluent version for the serializer: 5.2.1
根据 Gary 的评论,我可以想出一个自定义反序列化器,它可以根据 header 中的内容在通用记录和特定记录之间切换。这是 class A
.
public class GenericSpecificFlexibleDeserializer implements ExtendedDeserializer<Object> {
private static final String SPECIFIC_AVRO_READER = "specific.avro.reader";
private Map<String, ?> config;
private KafkaAvroDeserializer specificKafkaAvroDeserializer;
private KafkaAvroDeserializer genericKafkaAvroDeserializer;
@Override
public Object deserialize(String topic, Headers headers, byte[] data) {
List<String> entityTypeHeader = new LinkedList<>();
if (headers != null) {
if (new String(headers.lastHeader("message_type").value()).equals("A")){
return specificKafkaAvroDeserializer.deserialize(topic, data);
}
}
return genericKafkaAvroDeserializer.deserialize(topic, data);
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.config = configs;
HashMap<String, Object> specificConfig = new HashMap<>(config);
specificConfig.put(SPECIFIC_AVRO_READER, true);
HashMap<String, Object> genericConfig = new HashMap<>(config);
genericConfig.put(SPECIFIC_AVRO_READER, false);
specificKafkaAvroDeserializer = new KafkaAvroDeserializer();
specificKafkaAvroDeserializer.configure(specificConfig, isKey);
genericKafkaAvroDeserializer = new KafkaAvroDeserializer();
genericKafkaAvroDeserializer.configure(genericConfig, isKey);
}
@Override
public Object deserialize(String topic, byte[] data) {
return deserialize(topic, null, data);
}
@Override
public void close() {
genericKafkaAvroDeserializer.close();
specificKafkaAvroDeserializer.close();
}
}
当然,您的应用配置中的 spring.kafka.cloud.stream.kafka.binder.configuration.value.deserializer
必须设置为 GenericSpecificFlexibleDeserializer
。