.Net-Core 2.2 MassTransit.ConfigurationException: 状态机配置不正确

.Net-Core 2.2 MassTransit.ConfigurationException: The state machine was not properly configured

新手问题 - 我错过了什么?是否有可用的 dotnetcore 2.2 Saga 示例?

我有一个基本的端到端系统工作正常,消息在 docker-compose 中跨容器流动,但添加 Saga 似乎是一个挑战 -

问。我是否缺少调度程序依赖项?在 MassTransit 5.5.5 中,cfg.UseInMemoryMessageScheduler();不编译。

发生了一些奇怪的事情,我不得不将我的状态机明确标记为 ISaga

MassTransit.ConfigurationException:无法为 Model.WorkflowExecutionStateMachine 创建状态机连接器 ---> MassTransit.ConfigurationException:状态机配置不正确: workflowapi_1 | [失败] 未指定 ExecutingTask


    public void ConfigureServices(IServiceCollection services)
    {
        services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);

        // Register MassTransit
        services.AddMassTransit(x =>
        {
            x.AddConsumer<WorkflowTaskConsumer>();

            // required?
            x.AddSaga<WorkflowExecutionSaga>();

            x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var rabbitMQHostName = $"rabbitmq://{configuration["RabbitMQHostName"]}";

                Console.Out.WriteLineAsync($"Starting Workflow Receiver... {rabbitMQHostName}/{QueueNames.ExeuteWorkflowTaskQueue}");

                var host = cfg.Host(new Uri(rabbitMQHostName), hostConfig =>
                {
                    hostConfig.Username("guest");
                    hostConfig.Password("guest");
                });

                // A basic message works OK
                cfg.ReceiveEndpoint(host, QueueNames.ExeuteWorkflowTaskQueue, ep =>
                {
                    ep.PrefetchCount = 1;
                    ep.UseMessageRetry(mr => mr.Interval(1000, 2));
                    ep.ConfigureConsumer<WorkflowTaskConsumer>(provider);
                });

                // Doesn't like this
                cfg.ReceiveEndpoint(host, QueueNames.WorkflowStateMachineSagaQueueName, ep =>
                {
                    ep.PrefetchCount = 1;
                    ep.UseMessageRetry(mr => mr.Interval(1000, 2));
                    ep.StateMachineSaga(new WorkflowExecutionSaga(), new InMemorySagaRepository<WorkflowExecutionStateMachine>());
                });
            }));

            cfg.UseInMemoryMessageScheduler(); // doesn't compile!
        });
    }

巴士启动如下 -


    public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime)
    {
        if (env.IsDevelopment())
        {
            app.UseDeveloperExceptionPage();
        }
        else
        {
            // The default HSTS value is 30 days. You may want to change this for production scenarios, 
            // see https://aka.ms/aspnetcore-hsts.
            app.UseHsts();
        }

        app.UseMvc();

        var bus = app.ApplicationServices.GetService<IBusControl>();
        var busHandle = TaskUtil.Await(() =>
        {
            return bus.StartAsync();
        });

        lifetime.ApplicationStopping.Register(() =>
        {
            busHandle.Stop();
        });
    }

异常详情为

未处理的异常:MassTransit.ConfigurationException:无法为 Rapid.Workflow 创建状态机连接器。Api。Model.WorkflowExecutionStateMachine ---> MassTransit.ConfigurationException:状态机没有正确配置: workflowapi_1 | [失败] 未指定 ExecutingTask workflowapi_1 |在 Automatonymous.StateMachineConfigurationResult.CompileResults(IEnumerable1 results) workflowapi_1 | at Automatonymous.StateMachineConnectors.StateMachineConnector1.StateMachineEvents()+MoveNext() workflowapi_1 |在 System.Collections.Generic.List1.AddEnumerable(IEnumerable1 可枚举) workflowapi_1 |在System.Linq.Enumerable.ToList[TSource](IEnumerable1 source) workflowapi_1 | at Automatonymous.StateMachineConnectors.StateMachineConnector1..ctor(SagaStateMachine1 stateMachine) workflowapi_1 | --- End of inner exception stack trace --- workflowapi_1 | at Automatonymous.StateMachineConnectors.StateMachineConnector1..ctor(SagaStateMachine1 stateMachine) workflowapi_1 | at Automatonymous.SagaConfigurators.StateMachineSagaConfigurator1..ctor(SagaStateMachine1 stateMachine, ISagaRepository1存储库,ISagaConfigurationObserver 观察者) workflowapi_1 |在 MassTransit.AutomatonymousReceiveEndpointExtensions.StateMachineSaga[TInstance](IReceiveEndpointConfigurator 配置器,SagaStateMachine1 stateMachine, ISagaRepository1 存储库,Action`1 配置) workflowapi_1 |在 Rapid.Workflow.Api.Startup.<>c.b__2_5(IRabbitMqReceiveEndpointConfigurator ep) 在 /src/Workflow.Api/Startup.cs:line 74

依赖关系是

<PackageReference Include="Automatonymous" Version="4.1.6" />
<PackageReference Include="MassTransit" Version="5.5.5" />
<PackageReference Include="MassTransit.RabbitMQ" Version="5.5.5" />
<PackageReference Include="MassTransit.AspNetCore" Version="5.5.5" />
<PackageReference Include="MassTransit.Automatonymous" Version="5.5.5" /> 
<PackageReference Include="MassTransit.Extensions.DependencyInjection" Version="5.5.5" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" />

感谢任何提示或想法 -

您需要更改使用 .AddStateMachineSaga 方法,而不是您在相关代码中使用的 .AddSaga 方法。

// required? - yes, but should be as shown here
x.AddSagaStateMachine<WorkflowExecutionSaga, WorkflowState>();

在这种情况下,状态机和状态机实例类型都是必需的。然后,在您的端点中,使用:

ep.ConfigureSaga<WorkflowState>(provider);

您还需要确保在容器中配置了 saga 存储库,这是为 MS DI/in-memory 使用以下方法完成的:

x.AddSingleton<ISagaRepository<WorkflowState>, InMemorySagaRepository<WorkflowState>>();

假设您的状态机没有损坏,这应该会让您顺利进行。如果仍然出现错误,请确保所有状态机事件等都已正确配置。

此外,您的状态机实例应实现:

public class WorkflowState :
    SagaStateMachineInstance

并且您的状态机不需要实现ISaga

public class WorkflowExecutionSaga :
    MassTransitStateMachine<WorkflowState>

出现此错误似乎是因为 Saga class 声明了一些尚未使用(但 public)的事件 - DOH!

解决方案是从 Saga 中删除未使用的事件...

// uncomment will fail! public Event<ISatelliteTaskRequest> UnusedEvent { get; private set; }

在查看了这个示例 https://github.com/selcukusta/masstransit-saga-implementation 并将我的 program.cs 切回到基础知识之后 - 我仍然遇到错误!所以,不是容器/IOC/启动问题。

接下来,在源代码中查找 MassTransit 错误消息 (https://github.com/MassTransit/MassTransit/blob/master/src/MassTransit.AutomatonymousIntegration/Configuration/StateMachineConnectors/StateMachineConnector.cs),我意识到相关代码可能反映在 Saga 的所有 public 成员上 -

因此,从 Saga 中删除未使用的事件 class 可以解决问题。