spring cloud stream kafka - 函数式方法:生产墓碑记录时从不调用消费者
spring cloud stream kafka - Functional approach: Consumer is never called when producing tombstone records
问题与 https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/455 几乎相同,但对于 Functional
方法
@Bean
public Consumer<Message<Foo>> fooConsumer() {
return message -> {
String key = // From message header - KafkaHeaders.RECEIVED_MESSAGE_KEY
Foo payLoad = message.getPayload();
if (payLoad == null) {
deleteSomething(key);
} else {
addSomething(key, payLoad);
}
};
}
当生成适当的 'Foo' (json) 时,将调用 fooConsumer。但是当生成 'null' payload/tombstone 时,永远不会调用消费者函数。
我尝试使用自定义 'Convertor' 方法的解决方法有效:
@Component
public class KafkaNullConverter extends MappingJackson2MessageConverter {
@Override
protected Object convertFromInternal(Message<?> message, @NonNull Class<?> targetClass, Object conversionHint) {
if (message.getPayload() instanceof KafkaNull) {
return Foo.builder().isDeleted(true).build(); // Note: new `isDeleted` field added to `Foo`
}
return super.convertFromInternal(message, targetClass, conversionHint);
}
}
fooConsumer
则可以检查payload.isDeleted()
处理null
的情况。
但这是冗长的,污染 pojo/model 类 并且必须为每个消费者重复。
我知道 spring-integration 不能使用 null。但是是否有任何 better/standard 方法来处理 'tombstone' 用函数方法的用例?
版本:3.0.4.RELEASE
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>3.0.4.RELEASE</version>
</dependency>
问题与 https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/455 几乎相同,但对于 Functional
方法
@Bean
public Consumer<Message<Foo>> fooConsumer() {
return message -> {
String key = // From message header - KafkaHeaders.RECEIVED_MESSAGE_KEY
Foo payLoad = message.getPayload();
if (payLoad == null) {
deleteSomething(key);
} else {
addSomething(key, payLoad);
}
};
}
当生成适当的 'Foo' (json) 时,将调用 fooConsumer。但是当生成 'null' payload/tombstone 时,永远不会调用消费者函数。
我尝试使用自定义 'Convertor' 方法的解决方法有效:
@Component
public class KafkaNullConverter extends MappingJackson2MessageConverter {
@Override
protected Object convertFromInternal(Message<?> message, @NonNull Class<?> targetClass, Object conversionHint) {
if (message.getPayload() instanceof KafkaNull) {
return Foo.builder().isDeleted(true).build(); // Note: new `isDeleted` field added to `Foo`
}
return super.convertFromInternal(message, targetClass, conversionHint);
}
}
fooConsumer
则可以检查payload.isDeleted()
处理null
的情况。
但这是冗长的,污染 pojo/model 类 并且必须为每个消费者重复。
我知道 spring-integration 不能使用 null。但是是否有任何 better/standard 方法来处理 'tombstone' 用函数方法的用例?
版本:3.0.4.RELEASE
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>3.0.4.RELEASE</version>
</dependency>