包含来自消费者的 ACK 的较小任务的任务
Tasks which contains smaller tasks with ACK from Consumer
我正在研究一些 POC project 并试图解决以下问题。
我有一个 Publisher 正在向队列发送消息:
bus.PublishAsync<IBaseScenario>(new TestScenario())
.ContinueWith(task =>
{
if (task.IsCompleted && !task.IsFaulted)
Console.WriteLine("TestScenario queued with success.");
else
Console.WriteLine(task.Exception.Message);
});
还有一些 Consumers 正在使用消息:
bus.SubscribeAsync<IBaseScenario>("test_1_consumer",
message => Task.Factory.StartNew(() =>
{
var testScenario = message as TestScenario;
var anotherTestScenario = message as AnotherTestScenario;
ResolveScenario(testScenario);
ResolveScenario(anotherTestScenario);
}).ContinueWith(task =>
{
if (task.IsCompleted && !task.IsFaulted)
Console.WriteLine("Task ended up with success.");
else
Console.WriteLine(task.Exception.Message);
}));
此时一切正常,但这是我想要实现的目标。
我的Message是某种包含步骤的Scenario,每个Scenario被发送到Queue,然后由Consumer维护。
每次在消费者网站上完成每个步骤时,我都想从消费者那里获得某种 ACK
信息发送给发布者(例如,如果它最终成功与否。
我还想了解有关哪个消费者收到消息的信息。
每个消息(场景)都应该被视为原子操作,所以不应该对不同的消费者执行步骤,如果某个步骤没有成功结束,那么整个场景应该被视为失败。
这 2 个要求是否可以使用以下架构来解决,还是我需要使用更多的东西?
最简单的方法是使用此处描述的 EasyNetQ 请求响应模型 https://github.com/EasyNetQ/EasyNetQ/wiki/Request-Response
在响应中,您可以放置处理消息的消费者的身份和场景的最终状态。如果一个场景在一条消息中发送,并且该场景包含所有必要的步骤,那么所有步骤都将由一个消费者处理。
也就是说,消息重复始终是一个问题,因为要么发送消息两次,要么消息在消费者失败后重新排队。如果一个场景永远不会被处理超过一次是至关重要的,那么您将需要实施消息重复数据删除或使每个场景幂等。这是使用 RabbitMQ 时的普遍事实。
我正在研究一些 POC project 并试图解决以下问题。
我有一个 Publisher 正在向队列发送消息:
bus.PublishAsync<IBaseScenario>(new TestScenario())
.ContinueWith(task =>
{
if (task.IsCompleted && !task.IsFaulted)
Console.WriteLine("TestScenario queued with success.");
else
Console.WriteLine(task.Exception.Message);
});
还有一些 Consumers 正在使用消息:
bus.SubscribeAsync<IBaseScenario>("test_1_consumer",
message => Task.Factory.StartNew(() =>
{
var testScenario = message as TestScenario;
var anotherTestScenario = message as AnotherTestScenario;
ResolveScenario(testScenario);
ResolveScenario(anotherTestScenario);
}).ContinueWith(task =>
{
if (task.IsCompleted && !task.IsFaulted)
Console.WriteLine("Task ended up with success.");
else
Console.WriteLine(task.Exception.Message);
}));
此时一切正常,但这是我想要实现的目标。
我的Message是某种包含步骤的Scenario,每个Scenario被发送到Queue,然后由Consumer维护。
每次在消费者网站上完成每个步骤时,我都想从消费者那里获得某种
ACK
信息发送给发布者(例如,如果它最终成功与否。我还想了解有关哪个消费者收到消息的信息。
每个消息(场景)都应该被视为原子操作,所以不应该对不同的消费者执行步骤,如果某个步骤没有成功结束,那么整个场景应该被视为失败。
这 2 个要求是否可以使用以下架构来解决,还是我需要使用更多的东西?
最简单的方法是使用此处描述的 EasyNetQ 请求响应模型 https://github.com/EasyNetQ/EasyNetQ/wiki/Request-Response
在响应中,您可以放置处理消息的消费者的身份和场景的最终状态。如果一个场景在一条消息中发送,并且该场景包含所有必要的步骤,那么所有步骤都将由一个消费者处理。
也就是说,消息重复始终是一个问题,因为要么发送消息两次,要么消息在消费者失败后重新排队。如果一个场景永远不会被处理超过一次是至关重要的,那么您将需要实施消息重复数据删除或使每个场景幂等。这是使用 RabbitMQ 时的普遍事实。