为了确保并发性,同一组工作在多个队列中(FIFO)
To be sure about concurrency, same group of works in multiple queues (FIFO)
我有一个关于多消费者并发的问题。
我想将来自 Web 请求的作品发送到分布式队列的 rabbitmq。
我只想确定多个队列(FIFO)中的工作顺序。
因为这个请求来自不同的用户eech用户requests/works必须订购
我发现此功能在 Azure ServiceBus 和 ActiveMQ 消息分组上有不同的名称。
有什么方法可以在漂亮的 RabbitMQ 中做到这一点吗?
我想保证客户的要求必须相互订购。
每个客户可能有多个请求,但必须按顺序处理该客户的那些请求。
我希望通过在不同节点上使用多个消费者来快速处理传入的请求。
例如不同的客户 1 到 1000 发送超过 100 万的请求。
如果我把这个巨大的请求放在一个队列中,它会花费很多时间来消耗。所以我想在 n (5) 个节点之间分担这个进程负载。对于客户 X 的请求必须按相同的顺序进行处理
(假设 OP 正在询问诸如 ActiveMQs“消息分组:)
目前 RabbitMQ AFAIK 中没有内置此功能(根据 this answer,它不是 2013 年的),我现在也不知道(尽管我最近没有跟上) .
但是,RabbitMQ 的交换器和队列模型非常灵活 - 可以轻松地动态创建交换器和队列(这可以在其他消息系统中完成,但是,例如,如果您阅读 ActiveMQ 文档或 Red Hat AMQ 文档,您会发现用户指南中的 所有 示例都在系统启动时加载的配置文件中使用预先声明的队列 - 除了类似 RPC 的 request/response 通信)。
在 RabbitMQ 中,消费者(即消息消费线程)也很容易从多个队列消费。
因此,您可以在 RabbitMQ 之上构建一个系统,在该系统中您可以获得所需的分组语义。
一种方法是创建动态队列:第一次看到客户订单或一组新的客户订单时,将为该组的所有消息创建一个具有唯一名称的队列 - 该队列名称将是(通过另一个队列)传达给一个消费者,该消费者的唯一目的是在负责处理客户订单组的其他消费者之间进行负载平衡。即,负载均衡器将从其队列中拉出一条消息 "new group with queue name XYZ",它会在订单组消费者池中找到一个消费者,该消费者可以承受此负载并向其传递一条消息 "start listening to XYZ"。
另一种方法是使用 pub/sub 和主题路由 - 每个客户订单组将获得一个独特的主题 - 然后按上述方式进行。
在使用基于事件的系统时,尤其是在使用多个生产者 and/or 消费者时,重要的是要接受这样一个事实,即通常没有保证的事件顺序。为了获得一个健壮的系统,将系统设计成消息处理程序是幂等的也是明智的;他们应该容忍两次(或更多)收到相同的消息。
许多事情可能(实际上应该被允许)干扰秩序;
- 生产者传递消息的速度可能略有不同
- 一个生产者可能会错过一个确认(由于错过了包裹)并且会重新发送消息
- 一个消费者可能会获取并处理一条消息,但是在返回的过程中 ack 丢失了,因此消息被传递了两次(给另一个消费者)。
- 您的处理程序所依赖的某些其他服务可能已关闭,因此您必须拒绝该消息。
也就是说,NServicebus 等服务总线系统使用一种模式来强制执行消息的消费顺序。有一些要求:
- 您将需要一个允许条件更新的集中式存储(如sql服务器或文档存储);例如,你希望能够存储最后处理的消息的序列号(或者你在这个过程中走了多远),但是 only 如果已经存储 sequence/progress是 right/expected 一个。存储用户 ID 和进度,即使是数百万客户,对于大多数数据库来说应该是一个非常简单的操作。
- 您确保队列配置了 dead-letter-queue/exchange 用于重试,然后再次将您的原始队列设置为该队列的死信队列。
- 您在 retry/dead-letter-queue 上设置了 TTL(例如 30 秒)。这样,出现在死信队列中的消息将在超时后自动推回您的原始队列。
- 处理您的消息时,请检查您的 storage/database 是否处于处理消息的正确状态(即前面所需的步骤已经完成)。
- 如果您可以处理它,您就可以处理并更新存储(有条件地!)。
- If not - 您 nack 消息,以便它被扔到死信队列中。基本上你是说 "nah - I can't handle this message, there are probably some other message in the queue that should be handled first".
这种方式就是以正确的顺序处理大量消息。
但是如果发生了什么事情并且你收到一条带外消息,你将把它扔到重试队列(死信队列)并且 Rabbit 将确保它会回到队列中以便稍后重试.但只是在延迟之后。
这样做的美妙之处在于,您能够处理大多数可能会干扰消息处理的情况(消息乱序、相关服务关闭、您的处理程序在处理消息的过程中关闭) ) 以完全相同的方式;通过拒绝该消息并让您的基础架构 (Rabbit) 处理它并在一段时间后重试。
RabbitMQ 一致性哈希交换类型
我们正在使用 RabbitMQ,我们找到了一个插件。它使用 Consistent Hashing 算法按一致的密钥分发消息。
有关一致性哈希的更多信息;
https://en.wikipedia.org/wiki/Consistent_hashing
https://www.youtube.com/watch?v=viaNG1zyx1g
你可以从rabbitmq网页找到这个插件
插件:rabbitmq_consistent_hash_exchange
我有一个关于多消费者并发的问题。 我想将来自 Web 请求的作品发送到分布式队列的 rabbitmq。 我只想确定多个队列(FIFO)中的工作顺序。 因为这个请求来自不同的用户eech用户requests/works必须订购
我发现此功能在 Azure ServiceBus 和 ActiveMQ 消息分组上有不同的名称。
有什么方法可以在漂亮的 RabbitMQ 中做到这一点吗?
我想保证客户的要求必须相互订购。 每个客户可能有多个请求,但必须按顺序处理该客户的那些请求。 我希望通过在不同节点上使用多个消费者来快速处理传入的请求。 例如不同的客户 1 到 1000 发送超过 100 万的请求。 如果我把这个巨大的请求放在一个队列中,它会花费很多时间来消耗。所以我想在 n (5) 个节点之间分担这个进程负载。对于客户 X 的请求必须按相同的顺序进行处理
(假设 OP 正在询问诸如 ActiveMQs“消息分组:)
目前 RabbitMQ AFAIK 中没有内置此功能(根据 this answer,它不是 2013 年的),我现在也不知道(尽管我最近没有跟上) .
但是,RabbitMQ 的交换器和队列模型非常灵活 - 可以轻松地动态创建交换器和队列(这可以在其他消息系统中完成,但是,例如,如果您阅读 ActiveMQ 文档或 Red Hat AMQ 文档,您会发现用户指南中的 所有 示例都在系统启动时加载的配置文件中使用预先声明的队列 - 除了类似 RPC 的 request/response 通信)。
在 RabbitMQ 中,消费者(即消息消费线程)也很容易从多个队列消费。
因此,您可以在 RabbitMQ 之上构建一个系统,在该系统中您可以获得所需的分组语义。
一种方法是创建动态队列:第一次看到客户订单或一组新的客户订单时,将为该组的所有消息创建一个具有唯一名称的队列 - 该队列名称将是(通过另一个队列)传达给一个消费者,该消费者的唯一目的是在负责处理客户订单组的其他消费者之间进行负载平衡。即,负载均衡器将从其队列中拉出一条消息 "new group with queue name XYZ",它会在订单组消费者池中找到一个消费者,该消费者可以承受此负载并向其传递一条消息 "start listening to XYZ"。
另一种方法是使用 pub/sub 和主题路由 - 每个客户订单组将获得一个独特的主题 - 然后按上述方式进行。
在使用基于事件的系统时,尤其是在使用多个生产者 and/or 消费者时,重要的是要接受这样一个事实,即通常没有保证的事件顺序。为了获得一个健壮的系统,将系统设计成消息处理程序是幂等的也是明智的;他们应该容忍两次(或更多)收到相同的消息。
许多事情可能(实际上应该被允许)干扰秩序;
- 生产者传递消息的速度可能略有不同
- 一个生产者可能会错过一个确认(由于错过了包裹)并且会重新发送消息
- 一个消费者可能会获取并处理一条消息,但是在返回的过程中 ack 丢失了,因此消息被传递了两次(给另一个消费者)。
- 您的处理程序所依赖的某些其他服务可能已关闭,因此您必须拒绝该消息。
也就是说,NServicebus 等服务总线系统使用一种模式来强制执行消息的消费顺序。有一些要求:
- 您将需要一个允许条件更新的集中式存储(如sql服务器或文档存储);例如,你希望能够存储最后处理的消息的序列号(或者你在这个过程中走了多远),但是 only 如果已经存储 sequence/progress是 right/expected 一个。存储用户 ID 和进度,即使是数百万客户,对于大多数数据库来说应该是一个非常简单的操作。
- 您确保队列配置了 dead-letter-queue/exchange 用于重试,然后再次将您的原始队列设置为该队列的死信队列。
- 您在 retry/dead-letter-queue 上设置了 TTL(例如 30 秒)。这样,出现在死信队列中的消息将在超时后自动推回您的原始队列。
- 处理您的消息时,请检查您的 storage/database 是否处于处理消息的正确状态(即前面所需的步骤已经完成)。
- 如果您可以处理它,您就可以处理并更新存储(有条件地!)。
- If not - 您 nack 消息,以便它被扔到死信队列中。基本上你是说 "nah - I can't handle this message, there are probably some other message in the queue that should be handled first".
这种方式就是以正确的顺序处理大量消息。 但是如果发生了什么事情并且你收到一条带外消息,你将把它扔到重试队列(死信队列)并且 Rabbit 将确保它会回到队列中以便稍后重试.但只是在延迟之后。
这样做的美妙之处在于,您能够处理大多数可能会干扰消息处理的情况(消息乱序、相关服务关闭、您的处理程序在处理消息的过程中关闭) ) 以完全相同的方式;通过拒绝该消息并让您的基础架构 (Rabbit) 处理它并在一段时间后重试。
RabbitMQ 一致性哈希交换类型
我们正在使用 RabbitMQ,我们找到了一个插件。它使用 Consistent Hashing 算法按一致的密钥分发消息。
有关一致性哈希的更多信息;
https://en.wikipedia.org/wiki/Consistent_hashing
https://www.youtube.com/watch?v=viaNG1zyx1g
你可以从rabbitmq网页找到这个插件
插件:rabbitmq_consistent_hash_exchange