spring cloud stream kafka - 函数式方法:生产墓碑记录时从不调用消费者

spring cloud stream kafka - Functional approach: Consumer is never called when producing tombstone records

问题与 https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/455 几乎相同,但对于 Functional 方法

    @Bean
    public Consumer<Message<Foo>> fooConsumer() {
        return message -> {
            String key = // From message header - KafkaHeaders.RECEIVED_MESSAGE_KEY
            Foo payLoad = message.getPayload();

            if (payLoad == null) {
                deleteSomething(key);
            } else {
                addSomething(key, payLoad);
            }
        };
    }

当生成适当的 'Foo' (json) 时,将调用 fooConsumer。但是当生成 'null' payload/tombstone 时,永远不会调用消费者函数。

我尝试使用自定义 'Convertor' 方法的解决方法有效:

@Component
public class KafkaNullConverter extends MappingJackson2MessageConverter {

    @Override
    protected Object convertFromInternal(Message<?> message, @NonNull Class<?> targetClass, Object conversionHint) {
        if (message.getPayload() instanceof KafkaNull) {
            return Foo.builder().isDeleted(true).build(); // Note: new `isDeleted` field added to `Foo`
        }

        return super.convertFromInternal(message, targetClass, conversionHint);
    }
}

fooConsumer则可以检查payload.isDeleted()处理null的情况。 但这是冗长的,污染 pojo/model 类 并且必须为每个消费者重复。

我知道 spring-integration 不能使用 null。但是是否有任何 better/standard 方法来处理 'tombstone' 用函数方法的用例?

版本:3.0.4.RELEASE

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-stream-binder-kafka</artifactId>
   <version>3.0.4.RELEASE</version>
</dependency>

这是最近在主分支上修复的 https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/pull/936 and https://github.com/spring-cloud/spring-cloud-function/issues/557