包含来自消费者的 ACK 的较小任务的任务

Tasks which contains smaller tasks with ACK from Consumer

我正在研究一些 POC project 并试图解决以下问题。

我有一个 Publisher 正在向队列发送消息:

bus.PublishAsync<IBaseScenario>(new TestScenario())
            .ContinueWith(task =>
            {
                if (task.IsCompleted && !task.IsFaulted)
                    Console.WriteLine("TestScenario queued with success.");
                else
                    Console.WriteLine(task.Exception.Message);
            });

还有一些 Consumers 正在使用消息:

bus.SubscribeAsync<IBaseScenario>("test_1_consumer",
            message => Task.Factory.StartNew(() =>
            {
                var testScenario = message as TestScenario;
                var anotherTestScenario = message as AnotherTestScenario;

                ResolveScenario(testScenario);
                ResolveScenario(anotherTestScenario);

            }).ContinueWith(task =>
            {
                if (task.IsCompleted && !task.IsFaulted)
                    Console.WriteLine("Task ended up with success.");
                else
                    Console.WriteLine(task.Exception.Message);
            }));

此时一切正常,但这是我想要实现的目标。

我的Message是某种包含步骤的Scenario,每个Scenario被发送到Queue,然后由Consumer维护。

  1. 每次在消费者网站上完成每个步骤时,我都想从消费者那里获得某种 ACK 信息发送给发布者(例如,如果它最终成功与否。

  2. 我还想了解有关哪个消费者收到消息的信息。

每个消息(场景)都应该被视为原子操作,所以不应该对不同的消费者执行步骤,如果某个步骤没有成功结束,那么整个场景应该被视为失败。

这 2 个要求是否可以使用以下架构来解决,还是我需要使用更多的东西?

最简单的方法是使用此处描述的 EasyNetQ 请求响应模型 https://github.com/EasyNetQ/EasyNetQ/wiki/Request-Response

在响应中,您可以放置​​处理消息的消费者的身份和场景的最终状态。如果一个场景在一条消息中发送,并且该场景包含所有必要的步骤,那么所有步骤都将由一个消费者处理。

也就是说,消息重复始终是一个问题,因为要么发送消息两次,要么消息在消费者失败后重新排队。如果一个场景永远不会被处理超过一次是至关重要的,那么您将需要实施消息重复数据删除或使每个场景幂等。这是使用 RabbitMQ 时的普遍事实。