消息的手动确认(检查点):Spring Cloud Stream Kenesis Binder
Manual Acknowledgement (Checkpointing) of Messages: Spring Cloud Stream Kenesis Binder
我们正在尝试将使用来自 Kafka 的消息的 Spring Cloud Stream 应用程序移植到 AWS Kenesis。我们需要手动确认才能处理某些超时情况。
对于 Kafka,我们使用 属性 autocommitoffset
来伪造并使用 ACKNOWLEDGMENT header 来处理手动确认。
我查看了 Spring Cloud Stream 的文档
通过以下内容:
https://dataflow.spring.io/docs/recipes/kinesis/simple-producer-consumer/
https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc
但找不到任何解决方案。任何指示都会非常有帮助。
经过一番搜索,找到了解决方案:
在 Kenesis 中,shard
相当于 partition
,checkpoint
相当于 offset
在应用程序 Yml 中:
spring:
cloud:
stream:
kinesis:
bindings:
consumer-in-0:
consumer:
checkpointMode: manual
检查点示例代码
@Bean
public Consumer<Message<String>> consume() {
return message -> {
System.out.println("message received : "+ message.getPayload());
System.out.println("message headers : "+ message.getHeaders());
Checkpointer checkPointer = (Checkpointer) message.getHeaders().get(AwsHeaders.CHECKPOINTER);
checkPointer.checkpoint();
};
}
我们正在尝试将使用来自 Kafka 的消息的 Spring Cloud Stream 应用程序移植到 AWS Kenesis。我们需要手动确认才能处理某些超时情况。
对于 Kafka,我们使用 属性 autocommitoffset
来伪造并使用 ACKNOWLEDGMENT header 来处理手动确认。
我查看了 Spring Cloud Stream 的文档 通过以下内容: https://dataflow.spring.io/docs/recipes/kinesis/simple-producer-consumer/ https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc
但找不到任何解决方案。任何指示都会非常有帮助。
经过一番搜索,找到了解决方案:
在 Kenesis 中,shard
相当于 partition
,checkpoint
相当于 offset
在应用程序 Yml 中:
spring:
cloud:
stream:
kinesis:
bindings:
consumer-in-0:
consumer:
checkpointMode: manual
检查点示例代码
@Bean
public Consumer<Message<String>> consume() {
return message -> {
System.out.println("message received : "+ message.getPayload());
System.out.println("message headers : "+ message.getHeaders());
Checkpointer checkPointer = (Checkpointer) message.getHeaders().get(AwsHeaders.CHECKPOINTER);
checkPointer.checkpoint();
};
}