Spring Cloud Stream 如何防止应用程序实例接收重复消息?
How Spring Cloud Stream prevents the application’s instances from receiving duplicate messages?
Spring Cloud Stream基于At least once方法,这意味着在一些极少数情况下重复消息 可以到达端点。
Spring Cloud Stream 是否保留已接收消息的缓冲区?
Enterprise Integration Patterns 一书中的 IdempotentReceiver 建议:
将接收器设计为幂等接收器,可以安全地多次接收相同消息的接收器。
Spring Cloud Stream 是否控制消费者中的重复消息?
更新:
Spring Cloud Stream 中的一段说:
4.5.1. Durability
Consistent with the opinionated application model of Spring Cloud Stream, consumer group subscriptions are durable. That is, a binder implementation ensures that group subscriptions are persistent and that, once at least one subscription for a group has been created, the group receives messages, even if they are sent while all applications in the group are stopped.
Anonymous subscriptions are non-durable by nature. For some binder implementations (such as RabbitMQ), it is possible to have non-durable group subscriptions.
In general, it is preferable to always specify a consumer group when binding an application to a given destination. When scaling up a Spring Cloud Stream application, you must specify a consumer group for each of its input bindings. Doing so prevents the application’s instances from receiving duplicate messages (unless that behavior is desired, which is unusual).
我认为您对 spring-cloud-stream 框架责任的假设是不正确的。
Spring-cloud-stream 简而言之是一个框架,负责连接和适配开发者提供的 producers/consumers 到 spring-cloud-stream binder(例如,Kafka、Rabbit、Kinesis 等)。
因此,连接到代理、从代理接收消息、反序列化、调用用户代码、序列化消息并将其发送回代理都在框架的职责范围内。所以你可以把它看作纯粹的基础设施。
你所描述的更多是一个应用程序问题,因为实际的接收器是用户将作为 spring-cloud-stream 开发体验的一部分开发的东西,因此幂等性的责任在于这样的用户。
此外,最重要的是,大多数代理已经通过确保特定消息仅被传递一次来处理幂等性(以某种方式)。也就是说,如果有人向此类代理发送相同的消息,它不会知道它是重复的,因此幂等性 and/or 重复数据删除的要求仍然有效,但正如您所看到的那样,考虑到数量,它并不是那么简单您对幂等性的理解可能与我的不同,因此我们的方法也可能不同。
最后一件事(部分是为了证明我的最后一点):可以安全地多次接收相同的消息。 - 这就是它所说的全部内容,但是 safely[=17 的作用是什么? =] 对你、对我、对其他人真的意味着什么?
如果您担心应用程序从代理接收和处理消息但在确认消息之前崩溃的情况,则可能会发生这种情况。 Spring cloud stream app starters 支持自动配置持久性 message metadata store which backs Spring Integration's IdempotentReceiverInterceptor. An example of this is in the SFTP source app starter。默认情况下,sftp 源使用内存中的元数据存储,因此它不会在重启后继续存在,但可以自定义为使用持久存储。
Spring Cloud Stream基于At least once方法,这意味着在一些极少数情况下重复消息 可以到达端点。
Spring Cloud Stream 是否保留已接收消息的缓冲区?
Enterprise Integration Patterns 一书中的 IdempotentReceiver 建议: 将接收器设计为幂等接收器,可以安全地多次接收相同消息的接收器。
Spring Cloud Stream 是否控制消费者中的重复消息?
更新:
Spring Cloud Stream 中的一段说:
4.5.1. Durability Consistent with the opinionated application model of Spring Cloud Stream, consumer group subscriptions are durable. That is, a binder implementation ensures that group subscriptions are persistent and that, once at least one subscription for a group has been created, the group receives messages, even if they are sent while all applications in the group are stopped. Anonymous subscriptions are non-durable by nature. For some binder implementations (such as RabbitMQ), it is possible to have non-durable group subscriptions. In general, it is preferable to always specify a consumer group when binding an application to a given destination. When scaling up a Spring Cloud Stream application, you must specify a consumer group for each of its input bindings. Doing so prevents the application’s instances from receiving duplicate messages (unless that behavior is desired, which is unusual).
我认为您对 spring-cloud-stream 框架责任的假设是不正确的。 Spring-cloud-stream 简而言之是一个框架,负责连接和适配开发者提供的 producers/consumers 到 spring-cloud-stream binder(例如,Kafka、Rabbit、Kinesis 等)。 因此,连接到代理、从代理接收消息、反序列化、调用用户代码、序列化消息并将其发送回代理都在框架的职责范围内。所以你可以把它看作纯粹的基础设施。
你所描述的更多是一个应用程序问题,因为实际的接收器是用户将作为 spring-cloud-stream 开发体验的一部分开发的东西,因此幂等性的责任在于这样的用户。 此外,最重要的是,大多数代理已经通过确保特定消息仅被传递一次来处理幂等性(以某种方式)。也就是说,如果有人向此类代理发送相同的消息,它不会知道它是重复的,因此幂等性 and/or 重复数据删除的要求仍然有效,但正如您所看到的那样,考虑到数量,它并不是那么简单您对幂等性的理解可能与我的不同,因此我们的方法也可能不同。 最后一件事(部分是为了证明我的最后一点):可以安全地多次接收相同的消息。 - 这就是它所说的全部内容,但是 safely[=17 的作用是什么? =] 对你、对我、对其他人真的意味着什么?
如果您担心应用程序从代理接收和处理消息但在确认消息之前崩溃的情况,则可能会发生这种情况。 Spring cloud stream app starters 支持自动配置持久性 message metadata store which backs Spring Integration's IdempotentReceiverInterceptor. An example of this is in the SFTP source app starter。默认情况下,sftp 源使用内存中的元数据存储,因此它不会在重启后继续存在,但可以自定义为使用持久存储。