消息的手动确认(检查点):Spring Cloud Stream Kenesis Binder

Manual Acknowledgement (Checkpointing) of Messages: Spring Cloud Stream Kenesis Binder

我们正在尝试将使用来自 Kafka 的消息的 Spring Cloud Stream 应用程序移植到 AWS Kenesis。我们需要手动确认才能处理某些超时情况。

对于 Kafka,我们使用 属性 autocommitoffset 来伪造并使用 ACKNOWLEDGMENT header 来处理手动确认。

我查看了 Spring Cloud Stream 的文档 通过以下内容: https://dataflow.spring.io/docs/recipes/kinesis/simple-producer-consumer/ https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc

但找不到任何解决方案。任何指示都会非常有帮助。

经过一番搜索,找到了解决方案:

在 Kenesis 中,shard 相当于 partitioncheckpoint 相当于 offset

在应用程序 Yml 中:

spring:
  cloud:
    stream:
      kinesis:
        bindings:
          consumer-in-0:
            consumer:
              checkpointMode: manual

检查点示例代码

 @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            System.out.println("message received : "+ message.getPayload());
            System.out.println("message headers : "+ message.getHeaders());
            Checkpointer checkPointer = (Checkpointer) message.getHeaders().get(AwsHeaders.CHECKPOINTER);
            checkPointer.checkpoint();
           
        };
    }

转介 Kenesis Consumer Binder Properties