MassTransit Step1FinishedEvent Event 事件在 ProcessingStartedState 状态期间未处理为 StateMachine 状态机错误

MassTransit The Step1FinishedEvent Event event is not handled during the ProcessingStartedState state for the StateMachine state machine error

我正在尝试使用 MassTransit 状态机制作一个完整的示例,以编排完全解耦的服务并将 运行 变成一个异常:The Step1FinishedEvent Event event is not handled during the ProcessingStartedState state for the ArcStateMachine state machine error。在调试会话期间,消息(由消费者使用)似乎触发了状态机处理的事件太晚了。

我的定义:

// StateMachine processing instance definition
[BsonIgnoreExtraElements]
public class ArcProcess : SagaStateMachineInstance, ISagaVersion
{
    public Guid CorrelationId { get; set; }

    public string CurrentState { get; set; }

    public int Version { get; set; }

    public Guid ActivationId { get; set; }
}

// Contracts correlations definitions for message exchange of the state machine
public static class MessageContracts
{
    static bool _initialized;

    public static void Initialize()
    {
        if (_initialized)
            return;

        GlobalTopology.Send.UseCorrelationId<StartProcessingMessage>(x => x.ActivationId);
        GlobalTopology.Send.UseCorrelationId<StartStep1Message>(x => x.ActivationId);
        GlobalTopology.Send.UseCorrelationId<Step1FinishedMessage>(x => x.ActivationId);
        GlobalTopology.Send.UseCorrelationId<StartStep2Message>(x => x.ActivationId);
        GlobalTopology.Send.UseCorrelationId<Step2FinishedMessage>(x => x.ActivationId);
        GlobalTopology.Send.UseCorrelationId<ProcessingFinishedMessage>(x => x.ActivationId);

        _initialized = true;
    }
}

// Step1CounsumerDefinition to avoid fault messages be routed to _error RMQ queue for Step1Consumer
public class Step1ConsumerDefinition : ConsumerDefinition<Step1Consumer>
{
    public Step1ConsumerDefinition()
    {
    }

    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
        IConsumerConfigurator<Step1Consumer> consumerConfigurator)
    {
        endpointConfigurator.DiscardFaultedMessages();
    }
}

我的2个实际处理消费者:

public class Step1Consumer : IConsumer<StartStep1Message>
{
    readonly ILogger<Step1Consumer> _Logger;

    private readonly int _DelaySeconds = 5;

    public Step1Consumer(ILogger<Step1Consumer> logger)
    {
        _Logger = logger;
    }

    public async Task Consume(ConsumeContext<StartStep1Message> context)
    {
        var activationId = context.Message.ActivationId;

        _Logger.LogInformation($"Step 1 started: {activationId}");

        await Task.Delay(_DelaySeconds * 1000);

        _Logger.LogInformation($"Step 2 finished: {activationId}");

        await context.Publish<Step1FinishedMessage>(new { ActivationId = activationId });
    }
}

public class Step2Consumer : IConsumer<StartStep2Message>
{
    readonly ILogger<Step2Consumer> _Logger;

    private readonly int _DelaySeconds = 1;

    public Step2Consumer(ILogger<Step2Consumer> logger)
    {
        _Logger = logger;
    }

    public async Task Consume(ConsumeContext<StartStep2Message> context)
    {
        var activationId = context.Message.ActivationId;

        _Logger.LogInformation($"Step 2 started {activationId}");

        await Task.Delay(_DelaySeconds * 1000);

        _Logger.LogInformation($"Step 2 finished {activationId}");

        await context.Publish<Step2FinishedMessage>(new { ActivationId = activationId });
    }
}

我还有 2 个辅助消费者来协调不同服务之间的消息转换,以解耦它们并检测所有处理的完成:

public class TransitionConsumer : 
    IConsumer<StartProcessingMessage>, 
    IConsumer<Step1FinishedMessage>, 
    IConsumer<Step2FinishedMessage>
{
    readonly ILogger<TransitionConsumer> _Logger;

    private readonly int _DelaySeconds = 5;

    public TransitionConsumer(
        ILogger<TransitionConsumer> logger
        )
    {
        _Logger = logger;
    }

    public async Task Consume(ConsumeContext<StartProcessingMessage> context)
    {
        var activationId = context.Message.ActivationId;
        _Logger.LogInformation($"Transition from Started to Step 1: {activationId}");
        await context.Publish<StartStep1Message>(new { ActivationId = activationId });
    }

    public async Task Consume(ConsumeContext<Step1FinishedMessage> context)
    {
        var activationId = context.Message.ActivationId;
        _Logger.LogInformation($"Transition from Step 1 to Step 2: {activationId}");
        await context.Publish<StartStep2Message>(new { ActivationId = activationId });
    }

    public async Task Consume(ConsumeContext<Step2FinishedMessage> context)
    {
        var activationId = context.Message.ActivationId;
        _Logger.LogInformation($"Transition from Step 2 to Completion: {activationId}");
        await context.Publish<ProcessingFinishedMessage>(new { ActivationId = activationId });
    }
}

public class ProcessingFinishedConsumer : IConsumer<ProcessingFinishedMessage>
{
    readonly ILogger<ProcessingFinishedConsumer> _Logger;

    public ProcessingFinishedConsumer(ILogger<ProcessingFinishedConsumer> logger)
    {
        _Logger = logger;
    }

    public async Task Consume(ConsumeContext<ProcessingFinishedMessage> context)
    {
        _Logger.LogInformation($"Finish {context.Message.ActivationId}");

        await Task.CompletedTask;
    }
}

还有一个 Fault<> 消费者处理可能来自 Step1ConsumerStep2Consumer 的所有故障:

public class FaultConsumer : 
    IConsumer<Fault<StartStep1Message>>, 
    IConsumer<Fault<StartStep2Message>>
{
    readonly ILogger<FaultConsumer> _Logger;

    public FaultConsumer(ILogger<FaultConsumer> logger)
    {
        _Logger = logger;
    }

    public async Task Consume(ConsumeContext<Fault<StartStep1Message>> context)
    {
        await LogError("Step 1", context.Message.Message.ActivationId, context.Message.Exceptions);
    }

    public async Task Consume(ConsumeContext<Fault<StartStep2Message>> context)
    {
        await LogError("Step 2", context.Message.Message.ActivationId, context.Message.Exceptions);
    }

    private async Task LogError(string step, Guid activationId, ExceptionInfo[] exceptions)
    {
        var errorMessages = string.Join(", ", exceptions.Select(e => e.Message));
        _Logger.LogInformation($"{step} failed for {activationId}, cause: {errorMessages}");
    }
}

这是状态机定义:

public class ArcStateMachine: MassTransitStateMachine<ArcProcess>
{
    static ArcStateMachine()
    {
        MessageContracts.Initialize();
    }

    public ArcStateMachine()
    {
        InstanceState(x => x.CurrentState);

        Initially(
            When(StartProcessingEvent)
            .Then(context =>
            {
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .TransitionTo(ProcessingStartedState));

        During(ProcessingStartedState,
            When(Step1StartedEvent)
            .Then(context =>
            {
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .TransitionTo(Step1StartedState));

        During(Step1StartedState,
            When(Step1FinishedEvent)
            .Then(context =>
            {
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .TransitionTo(Step1FinishedState));

        During(Step1FinishedState,
            When(Step2StartedEvent)
            .Then(context =>
            {
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .TransitionTo(Step2StartedState));

        During(Step2StartedState,
            When(Step2FinishedEvent)
            .Then(context =>
            {
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .TransitionTo(Step2FinishedState));

        During(Step2FinishedState,
            When(ProcessingFinishedEvent)
            .Then(context =>
            {
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .Finalize());
    }

    public State ProcessingStartedState { get; }
    public State Step1StartedState { get; }
    public State Step1FinishedState { get; }
    public State Step2StartedState { get; }
    public State Step2FinishedState { get; }

    public Event<StartProcessingMessage> StartProcessingEvent { get; }
    public Event<StartStep1Message> Step1StartedEvent { get; }
    public Event<Step1FinishedMessage> Step1FinishedEvent { get; }
    public Event<StartStep2Message> Step2StartedEvent { get; }
    public Event<Step2FinishedMessage> Step2FinishedEvent { get; }
    public Event<ProcessingFinishedMessage> ProcessingFinishedEvent { get; }
}

以及 MassTransit 的设置:

        var rabbitHost = Configuration["RABBIT_MQ_HOST"];

        if (rabbitHost.IsNotEmpty())
        {
            services.AddMassTransit(cnf =>
            {
                var connectionString = Configuration["MONGO_DB_CONNECTION_STRING"];

                cnf.AddSagaStateMachine<ArcStateMachine, ArcProcess>()
                    .Endpoint(e => e.Name = BusConstants.SagaQueue)
                    .MongoDbRepository(connectionString, r =>
                    {
                        r.DatabaseName = "mongo";
                        r.CollectionName = "WorkflowState";
                    });

                cnf.AddConsumer(typeof(TransitionConsumer));
                cnf.AddConsumer(typeof(Step1Consumer), typeof(Step1ConsumerDefinition));
                cnf.AddConsumer(typeof(Step2Consumer));
                cnf.AddConsumer(typeof(ProcessingFinishedConsumer));
                cnf.AddConsumer(typeof(FaultConsumer));

                //cnf.AddMessageScheduler(schedulerEndpoint);

                cnf.UsingRabbitMq((context, cfg) =>
                {
                    cfg.Host(new Uri(rabbitHost), hst =>
                    {
                        hst.Username("guest");
                        hst.Password("guest");
                    });

                    //cfg.UseMessageScheduler(schedulerEndpoint);

                    cfg.ConfigureEndpoints(context);
                });
            });

            services.AddMassTransitHostedService();

            services.AddSwaggerGen(c =>
            {
                c.SwaggerDoc("v1", new OpenApiInfo { Title = "MyApp", Version = "v1" });
            });
        }

所以实际上消息流应该是这样的:

  1. 控制器发布 StartProcessingMessageTransitionConsumer 使用,后者又发布 StartStep1Message
  2. Step1Consumer 获取消息,完成其工作并发布 Step1FinishedMessage
  3. TransitionConsumer 获取消息并发布 StartStep2Message.
  4. Step2Consumer 获取消息,执行其工作并发布 Step2FinishedMessage
  5. TransitionConsumer 获取消息并发布由 ProcessingFinishedConsumer.
  6. 使用的 ProcessingFinishedMessage

在这种情况下,Step1ConsumerStep2Consumer 都不知道对方的存在,步骤之间的过渡由 TransitionConsumer 协调。所有这一切都是在状态机跟踪每条消息并通过所有尊重的状态时完成的。

问题一开始就出现了,因为 TransitionConsumerArcStateMachine 开始处理 StartProcessingEvent 之前发布了 StartStep1Message,我认为之前会被触发。这一切导致了状态机卡在ProcessingStartedState的情况。结果是,在发布 Step1FinishedEvent 时,机器不在 Step1StartedState 中,由于 StartStep1Message 消息应该触发 Step1StartedEvent.

我该如何解决这个问题?

您应该为您的状态机创建一个 Saga Definition,以便您可以配置消息重试和内存中的发件箱。

在该定义中,将 retry/outbox 直接添加到接收端点,如下所示。

endpointConfigurator.UseMessageRetry(r => r.Interval(3,1000));
endpointConfigurator.UseInMemoryOutbox();

这应该可以处理 saga 中的任何并发问题(消费者可能会接收消息、生成事件,并且在 saga 完成处理触发命令的事件之前将该事件分派给 saga消费者。是的,就是这么快。