如何解决队列多消费并发问题?
How to solve the queue multi-consuming concurrency problem?
我们的程序正在使用队列。
多个消费者正在处理消息。
消费者执行以下操作:
- 从队列接收 on 或 off 状态消息。
- 从存储库获取最新状态。
- 比较存储库的状态和从消息中收到的状态。
- 如果on/off状态不同,更新数据。 (此时其他相关数据也更新了。)
假设这个过程由多个消费者处理,预计会出现以下问题。
- Producer发送消息1:on,2:off,3:on。
- 消费者A收到消息#1,因为没有最新数据,所以将消息#1存储在存储中。
- 消费者 A 收到消息 #2。
- 此时,消费者B同时收到消息#3。
- 消费者A和B同时从存储中读取最新数据(消息1)。
- 消费者B先完成处理。不要更新存储库,因为 on/off 状态未更改。(1: on, 3: on)
- 然后消费者A完成处理。 on/off 状态已更改,因此它处理并保存工作。 (1:开,2:关)
正常情况下,DB中剩余的最新数据应该是on.
(这是因为消息是按照开->关->开的顺序发送的。)
但是按照上面的场景,off依然是最新的数据。
请问有什么好的方法可以解决这个问题吗?
作为参考,我们使用的队列使用 AWS Amazon MQ,存储使用 AWS dynamoDB。并使用 Spring Boot.
这里的根本问题是您需要按顺序消费这些“状态”消息,但您使用的是并发消费者,这会导致竞争条件和超出-订单消息处理。简而言之,您使用并发消费者的基本架构导致了这个问题。
您可以按照评论中的建议在数据库中使用时间戳制定某种解决方案,但这对客户来说是额外的工作,而且数据库中存储的额外数据并不是绝对必要的。
解决该问题的最简单方法是连续而不是同时使用消息。有几种不同的方法可以做到这一点,例如:
- 只为带有“状态”消息的队列定义 1 个使用者。
- 使用 ActiveMQ 的“exclusive consumer”功能来确保只有一个消费者接收消息。
- 使用 message groups 将所有“状态”消息组合在一起以确保它们被连续处理(即按顺序)。
我们的程序正在使用队列。 多个消费者正在处理消息。
消费者执行以下操作:
- 从队列接收 on 或 off 状态消息。
- 从存储库获取最新状态。
- 比较存储库的状态和从消息中收到的状态。
- 如果on/off状态不同,更新数据。 (此时其他相关数据也更新了。)
假设这个过程由多个消费者处理,预计会出现以下问题。
- Producer发送消息1:on,2:off,3:on。
- 消费者A收到消息#1,因为没有最新数据,所以将消息#1存储在存储中。
- 消费者 A 收到消息 #2。
- 此时,消费者B同时收到消息#3。
- 消费者A和B同时从存储中读取最新数据(消息1)。
- 消费者B先完成处理。不要更新存储库,因为 on/off 状态未更改。(1: on, 3: on)
- 然后消费者A完成处理。 on/off 状态已更改,因此它处理并保存工作。 (1:开,2:关)
正常情况下,DB中剩余的最新数据应该是on.
(这是因为消息是按照开->关->开的顺序发送的。)
但是按照上面的场景,off依然是最新的数据。
请问有什么好的方法可以解决这个问题吗? 作为参考,我们使用的队列使用 AWS Amazon MQ,存储使用 AWS dynamoDB。并使用 Spring Boot.
这里的根本问题是您需要按顺序消费这些“状态”消息,但您使用的是并发消费者,这会导致竞争条件和超出-订单消息处理。简而言之,您使用并发消费者的基本架构导致了这个问题。
您可以按照评论中的建议在数据库中使用时间戳制定某种解决方案,但这对客户来说是额外的工作,而且数据库中存储的额外数据并不是绝对必要的。
解决该问题的最简单方法是连续而不是同时使用消息。有几种不同的方法可以做到这一点,例如:
- 只为带有“状态”消息的队列定义 1 个使用者。
- 使用 ActiveMQ 的“exclusive consumer”功能来确保只有一个消费者接收消息。
- 使用 message groups 将所有“状态”消息组合在一起以确保它们被连续处理(即按顺序)。