在并行执行中对任务进行分组,以防止同时执行具有相同 groupingID 的消息
Grouping tasks within parallel execution to prevent messages with same groupingID being executed concurrently
我有一个消费者服务使用 EasyNetQ 订阅者从 RabbitMQ 队列中检索消息。每条消息都需要几十秒的时间来处理,我需要并行 运行 它们以确保我能跟上生产者的步伐。但是,每条消息都有一个属性,称之为groupingId。重要的是,具有相同 groupingId 的任务不会同时执行,因为这会导致资源冲突。
很可能有数百个 groupingId,在通常情况下,同一时间不会有太多邮件具有相同的 ID。然而,数据可能是突发的,导致同时发生数百个相同 ID 的集群。
我认为 TPL Dataflow 可能是一个不错的选择,但我对它不是很熟悉,也不确定如何使用它来实现我的需要。任何指导将不胜感激。
创建分组 ID 的字典并锁定它们。
首先,在某处创建字典,可能作为成员变量。
ConcurrentDictionary<int,object> _locks = new ConcurrentDictionary<int, object>();
当您需要处理消息时,使用此逻辑。
if (!_locks.ContainsKey(message.GroupingID))
{
_locks.TryAdd(message.GroupingID, new object());
}
lock (_locks[message.GroupingID])
{
ProcessMessage(message);
}
我有一个消费者服务使用 EasyNetQ 订阅者从 RabbitMQ 队列中检索消息。每条消息都需要几十秒的时间来处理,我需要并行 运行 它们以确保我能跟上生产者的步伐。但是,每条消息都有一个属性,称之为groupingId。重要的是,具有相同 groupingId 的任务不会同时执行,因为这会导致资源冲突。
很可能有数百个 groupingId,在通常情况下,同一时间不会有太多邮件具有相同的 ID。然而,数据可能是突发的,导致同时发生数百个相同 ID 的集群。
我认为 TPL Dataflow 可能是一个不错的选择,但我对它不是很熟悉,也不确定如何使用它来实现我的需要。任何指导将不胜感激。
创建分组 ID 的字典并锁定它们。
首先,在某处创建字典,可能作为成员变量。
ConcurrentDictionary<int,object> _locks = new ConcurrentDictionary<int, object>();
当您需要处理消息时,使用此逻辑。
if (!_locks.ContainsKey(message.GroupingID))
{
_locks.TryAdd(message.GroupingID, new object());
}
lock (_locks[message.GroupingID])
{
ProcessMessage(message);
}