RabbitMQ 保持同一组中作业的处理顺序
RabbitMQ keep processing order of jobs in a same group
我的应用程序队列 (RabbitMQ) 中有一个作业列表。
有些工作是分组在一起的,必须按顺序做。(不是连续的,而是按派遣时间的顺序)
例如,考虑队列中的这 4 个作业:
[
{ "group": "x", "dispatched_timestamp": 10001, "field1": "some data", "field2": "some other data"},
{ "group": "g", "dispatched_timestamp": 10005,"field1": "some data", "field2": "some other data"},
{ "group": "x", "dispatched_timestamp": 10005,"field1": "some data", "field2": "some other data"},
{ "group": "t", "dispatched_timestamp": 10005,"field1": "some data", "field2": "some other data"}
]
我必须确保组 "x" 中的第一个作业在第三个作业(同一组)之前成功执行。
但我不在乎第四个作业是否比第一个(或其他)执行得更快。
因为有时可能会发生所有三个作业都交付给 3 个消费者,但第一个作业由于某种原因失败(但第二个和第三个作业成功完成)。
我知道在这种情况下,有些情况下队列中的所有作业都属于同一组,因此多个消费者无法处理它们,他们必须一个一个地交付。没关系。
AMQ协议中没有这样的东西可以导致这个确切的解决方案,有一些方法可以解决这个问题。
- 为每个消息组定义队列
- 将并发设置为 1
让我引用文档中的消息排序
Section 4.7 of the AMQP 0-9-1 core specification explains the
conditions under which ordering is guaranteed: messages published in
one channel, passing through one exchange and one queue and one
outgoing channel will be received in the same order that they were
sent. RabbitMQ offers stronger guarantees since release 2.7.0.
参考:https://www.rabbitmq.com/semantics.html
对您而言,首要的事情是保持消息顺序,一旦我们对消息进行排序,我们就可以利用并发性按顺序处理消息。
假设您的队列有 5 条消息,如图所示
Queue: Queue1
+--------------+
Head-->|m1|m2|m3|m4|m5| <---- Tail
+--------------+
有竞争消费者的概念,竞争消费者意味着同一队列有多个 consumers/subscribers。如果有多个消费者,那么他们每个人都会 运行 自主,这意味着消费者端的排序将不会被保留。为了保持消费者端的顺序,我们不应该使用竞争消费者。
即使现在消费者没有竞争,如果我们有多个执行者,我们仍然会丢失消息顺序。不止一个执行者意味着我们可以轮询队列,将轮询的消息发送给任何执行者。基于 CPU 执行策略等,我们仍然会丢失顺序,所以现在我们需要将执行程序的数量限制为 1。
由于我们只有一个执行器,每条轮询消息都会按顺序执行,所以会变成串行执行。
对于队列 1
执行器会按照以下顺序消费消息
-> m1
-> m2
-> m3
-> m4
-> m5
还是少了一块,如果m1
执行失败怎么办?
您可以在使用下一条消息之前重试 N
次,以实现此目的,除非您已成功执行任何轮询消息,否则不要确认。
从设计的角度来看,这看起来不太好,因为您正在串行而不是并行地处理消息,尽管您没有任何其他选择。
我的应用程序队列 (RabbitMQ) 中有一个作业列表。
有些工作是分组在一起的,必须按顺序做。(不是连续的,而是按派遣时间的顺序)
例如,考虑队列中的这 4 个作业:
[
{ "group": "x", "dispatched_timestamp": 10001, "field1": "some data", "field2": "some other data"},
{ "group": "g", "dispatched_timestamp": 10005,"field1": "some data", "field2": "some other data"},
{ "group": "x", "dispatched_timestamp": 10005,"field1": "some data", "field2": "some other data"},
{ "group": "t", "dispatched_timestamp": 10005,"field1": "some data", "field2": "some other data"}
]
我必须确保组 "x" 中的第一个作业在第三个作业(同一组)之前成功执行。 但我不在乎第四个作业是否比第一个(或其他)执行得更快。
因为有时可能会发生所有三个作业都交付给 3 个消费者,但第一个作业由于某种原因失败(但第二个和第三个作业成功完成)。
我知道在这种情况下,有些情况下队列中的所有作业都属于同一组,因此多个消费者无法处理它们,他们必须一个一个地交付。没关系。
AMQ协议中没有这样的东西可以导致这个确切的解决方案,有一些方法可以解决这个问题。
- 为每个消息组定义队列
- 将并发设置为 1
让我引用文档中的消息排序
Section 4.7 of the AMQP 0-9-1 core specification explains the conditions under which ordering is guaranteed: messages published in one channel, passing through one exchange and one queue and one outgoing channel will be received in the same order that they were sent. RabbitMQ offers stronger guarantees since release 2.7.0.
参考:https://www.rabbitmq.com/semantics.html
对您而言,首要的事情是保持消息顺序,一旦我们对消息进行排序,我们就可以利用并发性按顺序处理消息。
假设您的队列有 5 条消息,如图所示
Queue: Queue1
+--------------+
Head-->|m1|m2|m3|m4|m5| <---- Tail
+--------------+
有竞争消费者的概念,竞争消费者意味着同一队列有多个 consumers/subscribers。如果有多个消费者,那么他们每个人都会 运行 自主,这意味着消费者端的排序将不会被保留。为了保持消费者端的顺序,我们不应该使用竞争消费者。
即使现在消费者没有竞争,如果我们有多个执行者,我们仍然会丢失消息顺序。不止一个执行者意味着我们可以轮询队列,将轮询的消息发送给任何执行者。基于 CPU 执行策略等,我们仍然会丢失顺序,所以现在我们需要将执行程序的数量限制为 1。
由于我们只有一个执行器,每条轮询消息都会按顺序执行,所以会变成串行执行。
对于队列 1
执行器会按照以下顺序消费消息
-> m1
-> m2
-> m3
-> m4
-> m5
还是少了一块,如果m1
执行失败怎么办?
您可以在使用下一条消息之前重试 N
次,以实现此目的,除非您已成功执行任何轮询消息,否则不要确认。
从设计的角度来看,这看起来不太好,因为您正在串行而不是并行地处理消息,尽管您没有任何其他选择。