如何使用存储引擎持久化 Saga 实例并避免竞争条件

How to persist Saga instances using storage engines and avoid race condition

我尝试使用 RedisSagaRepository 持久化 Saga 实例;我想在负载平衡设置中使用 运行 Saga,所以我不能使用 InMemorySagaRepository。 但是,在切换之后,我注意到 Consumers 发布的一些事件没有被 Saga 处理。我检查了队列,没有看到任何消息。

我注意到,当消费者几乎没有时间处理命令和发布事件时,它很可能会发生。 如果我使用 InMemorySagaRepository 或在 Consumer.Consume()

中添加 Task.Delay() 则不会出现此问题

我是不是用错了?

此外,如果我想在负载平衡设置中 运行 Saga,并且如果 Saga 需要使用字典发送多个相同类型的命令来跟踪完整性(与 中的逻辑类似).当多个消费者同时发布事件时,如果两个 Sagas 同时处理两个不同的事件,我是否会出现竞争条件?在这种情况下,是否会正确设置 Dictionary in State 对象?

代码可用here

SagaService.ConfigureSagaEndPoint() 是我在 InMemorySagaRepositoryRedisSagaRepository

之间切换的地方
private void ConfigureSagaEndPoint(IRabbitMqReceiveEndpointConfigurator endpointConfigurator)
{
    var stateMachine = new MySagaStateMachine();

    try

    {
        var redisConnectionString = "192.168.99.100:6379";
        var redis = ConnectionMultiplexer.Connect(redisConnectionString);

        ///If we switch to RedisSagaRepository and Consumer publish its response too quick,
        ///It seems like the consumer published event reached Saga instance before the state is updated
        ///When it happened, Saga will not process the response event because it is not in the "Processing" state
        //var repository = new RedisSagaRepository<SagaState>(() => redis.GetDatabase());
        var repository = new InMemorySagaRepository<SagaState>();

        endpointConfigurator.StateMachineSaga(stateMachine, repository);
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.ToString());
    }
}

LeafConsumer.Consume 是我们添加 Task.Delay()

的地方
public class LeafConsumer : IConsumer<IConsumerRequest>
{
    public async Task Consume(ConsumeContext<IConsumerRequest> context)
    {
        ///If MySaga project is using RedisSagaRepository, uncomment await Task.Delay() below
        ///Otherwise, it seems that the Publish message from Consumer will not be processed
        ///If using InMemorySagaRepository, code will work without needing Task.Delay
        ///Maybe I am doing something wrong here with these projects
        ///Or in real life, we probably have code in Consumer that will take a few milliseconds to complete
        ///However, we cannot predict latency between Saga and Redis
        //await Task.Delay(1000);

        Console.WriteLine($"Consuming CorrelationId = {context.Message.CorrelationId}");
        await context.Publish<IConsumerProcessed>(new
        {
            context.Message.CorrelationId,
        });
    }
}

当您以这种方式发布事件,并且将多个服务实例与非事务性 saga 存储库(例如 Redis)一起使用时,您需要设计您的 saga,以便 Redis 使用和强制执行唯一标识符.这可以防止创建同一传奇的多个实例。

您还需要接受 "expected" 状态以上的事件。例如,期望在仅在处理中接收另一个事件之前接收到将 saga 置于 processing 状态的 Start,很可能会失败。建议允许 saga 由任何事件序列启动(最初,在 Automatonymous 中),以避免乱序消息传递问题。只要事件都将表盘从左向右移动,就会达到最终状态。如果在较晚的事件之后接收到较早的事件,则不应将状态向后移动(或在本例中向左移动),而应仅向 saga 实例添加信息并将其保留在较晚的状态。

如果在不同的服务实例上处理两个事件,它们都会尝试将 saga 实例插入 Redis,这将作为重复失败。然后消息应该重试(将 UseMessageRetry() 添加到您的接收端点),然后它将选择现在存在的传奇实例并应用事件。