Saga 处理无法投递的消息

Saga handling undeliverable messages

来自文档 here:

If the configuration of an endpoint changes, or if a message is mistakenly sent to an endpoint, it is possible that a message type is received that does not have any connected consumers. If this occurs, the message is moved to a _skipped queue (prefixed by the original queue name). The original message content is retained, and additional headers are added to indicate the host which moved the message.

当某些消息试图传递到 saga 但尚未创建 saga 实例时,我是否应该期望 saga 有相同的行为? 换句话说,如果想处理 saga 中的消息,但该消息不是创建 saga 的触发器。我希望在 _skipped 队列中看到这些消息,但实际上我没有在 _skipped 或 _error 队列中看到它们。我还可以看到消息已成功传送到 saga 队列并以某种方式成功使用,没有任何警告或错误。谁消费了消息?

已更新

状态机:

public class TestState : SagaStateMachineInstance, IVersionedSaga
{
    public Guid CorrelationId { get; set; }
    public Guid Id { get; set; }
    public string CurrentState { get; set; }
    public int Version { get; set; }
    public DateTime TimeStart { get; set; }
    public int Progress { get; set; }
}

public class TestStateMachine : MassTransitStateMachine<TestState>
{
    public TestStateMachine()
    {
        InstanceState(x => x.CurrentState);

        Event(() => ProcessingStarted, x => x.CorrelateById(context => context.Message.Id));
        Event(() => ProgressUpdated, x => x.CorrelateById(context => context.Message.Id));
        Event(() => ProcessingFinished, x => x.CorrelateById(context => context.Message.Id));

        Initially(
            When(ProcessingStarted)
                .Then(context =>
                {
                    context.Instance.TimeStart = DateTime.Now;
                })
                .TransitionTo(Processing)
        );

        During(Processing,
            When(ProgressUpdated)
                .Then(context =>
                {
                    context.Instance.Progress = context.Data.Progress;
                }),
            When(ProcessingFinished)
                .TransitionTo(Processed)
                .ThenAsync(async context =>
                {
                    await context.Raise(AllDone);
                })
        );

        During(Processed,
            When(AllDone)
                .Then(context =>
                {
                    Log.Information($"Saga {context.Instance.Id} finished");
                })
                .Finalize());

        SetCompletedWhenFinalized();
    }

    public State Processing { get; set; }
    public State Processed { get; set; }

    public Event AllDone { get; set; }

    public Event<ProcessingStarted> ProcessingStarted { get; set; }
    public Event<ProgressUpdated> ProgressUpdated { get; set; }
    public Event<ProcessingFinished> ProcessingFinished { get; set; }
}

public interface ProcessingStarted
{
    Guid Id { get; }
}

public interface ProgressUpdated
{
    Guid Id { get; }
    int Progress { get; }
}

public interface ProcessingFinished
{
    Guid Id { get; }
}

配置

var repository = new MongoDbSagaRepository<TestState>(Environment.ExpandEnvironmentVariables(configuration["MongoDb:ConnectionString"]), "sagas");

var services = new ServiceCollection();

services.AddSingleton(context => Bus.Factory.CreateUsingRabbitMq(x =>
{
    IRabbitMqHost host = x.Host(new Uri(Environment.ExpandEnvironmentVariables(configuration["MassTransit:ConnectionString"])), h => { });

    x.ReceiveEndpoint(host, "receiver_saga_queue", e =>
    {
        e.StateMachineSaga(new TestStateMachine(), repository);
    });

    x.UseRetry(r =>
    {
        r.Incremental(10, TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(50));
        r.Handle<MongoDbConcurrencyException>();
        r.Handle<UnhandledEventException>();
    });

    x.UseSerilog();
}));

var container = services.BuildServiceProvider();

var busControl = container.GetRequiredService<IBusControl>();

busControl.Start();

稍后,当我发布ProgressUpdated事件时

busControl.Publish<ProgressUpdated>(new
{
    Id = id,
    Progress = 50
});

我希望它会引发 UnhandledEventException 并将消息移至 _error 队列,但实际上我可以看到该消息出现在队列中,以某种方式被消耗,但我在 _error 或_skipped 队列。 MongoDB Saga 存储也没有创建任何新的 saga 实例。

我做错了什么?我需要配置 saga,如果它在未创建 saga 实例时获得 ProgressUpdated 事件,它将在稍后实例准备好接受它时重试。

解决方案

将事件 ProgressUpdatedProcessingFinished 重新配置为

Event(() => ProgressUpdated, x =>
{
    x.CorrelateById(context => context.Message.Id);

    x.OnMissingInstance(m => m.Fault());
});
Event(() => ProcessingFinished, x =>
{
    x.CorrelateById(context => context.Message.Id);

    x.OnMissingInstance(m => m.Fault());
});

并稍后在 UseRetry 中捕获 SagaException

r.Handle<SagaException>();

完成了!

感谢@Alexey Zimarev 分享知识!

如果消息应由 saga 处理但没有匹配的实例,则调用 OnMissingInstance 回调。

重新配置 ProgressUpdatedProcessingFinished:

Event(() => ProgressUpdated, x =>
{
    x.CorrelateById(context => context.Message.Id);
    x.OnMissingInstance(m => m.Fault());
});
Event(() => ProcessingFinished, x =>
{
    x.CorrelateById(context => context.Message.Id);
    x.OnMissingInstance(m => m.Fault());
});

并在重试过滤器中捕获 SagaException

r.Handle<SagaException>();