MassTransit 消息类型不能是系统类型异常

MassTransit messages types must not be System types exception

我是 MassTransit 的新手,不明白我做错了什么导致出现以下异常:Messages types must not be System types.

这是我的定义:

[BsonIgnoreExtraElements]
public class ArcProcess : SagaStateMachineInstance, ISagaVersion
{
    public Guid CorrelationId { get; set; }

    public string CurrentState { get; set; }

    public int Version { get; set; }

    public Guid ActivationId { get; set; }
}

public static class MessageContracts
{
    static bool _initialized;

    public static void Initialize()
    {
        if (_initialized)
            return;

        GlobalTopology.Send.UseCorrelationId<StartProcessingMessage>(x => x.ActivationId);
        GlobalTopology.Send.UseCorrelationId<ReconstructionFinishedMessage>(x => x.ActivationId);
        GlobalTopology.Send.UseCorrelationId<ProcessingFinishedMessage>(x => x.ActivationId);

        _initialized = true;
    }
}

我的 2 个消费者是:

public class StartReconstructionConsumer : IConsumer<StartProcessingMessage>
{
    readonly ILogger<StartReconstructionConsumer> _Logger;

    private readonly int _DelaySeconds = 5;

    public StartReconstructionConsumer(ILogger<StartReconstructionConsumer> logger)
    {
        _Logger = logger;
    }

    public async Task Consume(ConsumeContext<StartProcessingMessage> context)
    {
        var activationId = context.Message.ActivationId;

        _Logger.LogInformation($"Received Scan: {activationId}");

        await Task.Delay(_DelaySeconds * 1000);

        _Logger.LogInformation($"Finish Scan: {activationId}");

        await context.Publish<ReconstructionFinishedMessage>(new { ActivationId = activationId });
    }
}

public class ProcessingFinishedConsumer : IConsumer<ProcessingFinishedMessage>
{
    readonly ILogger<ProcessingFinishedConsumer> _Logger;

    public ProcessingFinishedConsumer(ILogger<ProcessingFinishedConsumer> logger)
    {
        _Logger = logger;
    }

    public async Task Consume(ConsumeContext<ProcessingFinishedMessage> context)
    {
        _Logger.LogInformation($"Finish {context.Message.ActivationId}");

        await Task.CompletedTask;
    }
}

这是状态机定义:

public class ArcStateMachine: MassTransitStateMachine<ArcProcess>
{
    static ArcStateMachine()
    {
        MessageContracts.Initialize();
    }

    public ArcStateMachine()
    {
        InstanceState(x => x.CurrentState);

        Initially(
            When(ProcessingStartedEvent)
            .Then(context =>
            {
                Console.WriteLine(">> ProcessingStartedEvent");
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .TransitionTo(ProcessingStartedState));

        During(ProcessingStartedState,
            When(ReconstructionFinishedEvent)
            .Then(context =>
            {
                Console.WriteLine(">> ReconstructionFinishedEvent");
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .Publish(context =>
            {
                return context.Init<ProcessingFinishedMessage>(new { ActivationId = context.Data.ActivationId });
            })
            .TransitionTo(ProcessingFinishedState)
            .Finalize());
    }

    public State ProcessingStartedState { get; }
    public State ReconstructionStartedState { get; }
    public State ReconstructionFinishedState { get; }
    public State ProcessingFinishedState { get; }

    public Event<StartProcessingMessage> ProcessingStartedEvent { get; }
    public Event<ReconstructionStartedMessage> ReconstructionStartedEvent { get; }
    public Event<ReconstructionFinishedMessage> ReconstructionFinishedEvent { get; }
    public Event<ProcessingFinishedMessage> ProcessingFinishedEvent { get; }
}

MassTransit 的设置如下所示:

var rabbitHost = Configuration["RABBIT_MQ_HOST"];

        if (rabbitHost.IsNotEmpty())
        {
            services.AddMassTransit(cnf =>
            {
                var connectionString = Configuration["MONGO_DB_CONNECTION_STRING"];

                var machine = new ArcStateMachine();
                var repository = MongoDbSagaRepository<ArcProcess>.Create(connectionString,
                    "mongoRepo", "WorkflowState");

                cnf.AddConsumer(typeof(StartReconstructionConsumer));
                cnf.AddConsumer(typeof(ProcessingFinishedConsumer));

                cnf.UsingRabbitMq((context, cfg) =>
                {
                    cfg.Host(new Uri(rabbitHost), hst =>
                    {
                        hst.Username("guest");
                        hst.Password("guest");
                    });

                    cfg.ConfigureEndpoints(context);

                    cfg.ReceiveEndpoint(BusConstants.SagaQueue,
                        e => e.StateMachineSaga(machine, repository));
                });
            });

            services.AddMassTransitHostedService();

            services.AddSwaggerGen(c =>
            {
                c.SwaggerDoc("v1", new OpenApiInfo { Title = "MyApp", Version = "v1" });
            });
        }

我有几个问题:

  1. 作为发布消息的结果实际发布事件的时间是什么时候? IE。在我的示例中,await _BusInstance.Bus.Publish<StartProcessingMessage>(new { ActivationId = id }); 是从 StartReconstructionConsumer 使用的 WebApi 调用的,但实际上状态机开始与 Initially(When(ProcessingStartedEvent)...?

    一起工作时
  2. 我的处理应该确保我已经处于 ProcessingStartedState 状态,以便 During(ProcessingStartedState, When(ReconstructionFinishedEvent)... 正确行动。那么我如何确保在接收到 StartProcessingMessage 时触发的消费者可以发布应该启动 DuringReconstructionFinishedMessage?我是否正确构建消息交换?

  3. 目前,对于 await context.Publish<ReconstructionFinishedMessage>(new { ActivationId = activationId });,我在日志中得到一个异常,指出 R-FAULT rabbitmq://localhost/saga.service d4070000-7b3b-704d-0f10-08d99942c959 Nanox.GC.Shared.AppCore.Messages.ReconstructionFinishedMessage ReconCaller.Saga.ArcProcess(00:00:04.1132604),而消息中的 guid 实际上是 MessageId。我在 rabbitmq 中的消息被路由到 saga.service_error,但有一个例外 Messages types must not be System types: System.Threading.Tasks.Task<Nanox.GC.Shared.AppCore.Messages.ProcessingFinishedMessage> (Parameter 'T').

看来我这里漏了好大..

我的意图是启动处理,该处理将由几个消费者按顺序处理多个阶段。所以在这里我尝试构建一个简单的状态机,只要有人调用 StartProcessing,它就会启动,然后每个消费者都会完成自己的工作并触发 FinishedStepX,这会将状态机提升到一个新的步骤并启动下一个消费者直到所有处理完成并且状态机将报告 ProcessingComplete.

感谢您提前提供的任何帮助

首先,您的总线配置有点奇怪,所以我已经清理了它:

services.AddMassTransit(cnf =>
{
    var connectionString = Configuration["MONGO_DB_CONNECTION_STRING"];

    cfg.AddSagaStateMachine<ArcStateMachine, ArcProcess>()
        .Endpoint(e => e.Name = BusConstants.SagaQueue)
        .MongoDbRepository(connectionString, r =>
        {
            r.DatabaseName = "mongoRepo";
            r.CollectionName = "WorkflowState";
        });

    cnf.AddConsumer<StartReconstructionConsumer>();
    cnf.AddConsumer<ProcessingFinishedConsumer>();

    cnf.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(new Uri(rabbitHost), hst =>
        {
            hst.Username("guest");
            hst.Password("guest");
        });

        cfg.ConfigureEndpoints(context);
    });
});

并且发布问题与正在使用的方法有关,只有 PublishAsync 允许使用消息初始值设定项:

During(ProcessingStartedState,
    When(ReconstructionFinishedEvent)
        .Then(context =>
        {
            Console.WriteLine(">> ReconstructionFinishedEvent");
            context.Instance.ActivationId = context.Data.ActivationId;
        })
        .PublishAsync(context =>
        {
            return context.Init<ProcessingFinishedMessage>(new { ActivationId = context.Data.ActivationId });
        })
        .TransitionTo(ProcessingFinishedState)
        .Finalize());

这应该可以解决你的问题。

在@Chris Patterson 的慷慨帮助下,可行的解决方案是:

定义:

[BsonIgnoreExtraElements]
public class ArcProcess : SagaStateMachineInstance, ISagaVersion
{
    public Guid CorrelationId { get; set; }

    public string CurrentState { get; set; }

    public int Version { get; set; }

    public Guid ActivationId { get; set; }
}

public interface StartProcessingMessage
{
    Guid ActivationId { get; }
}

public interface ProcessingFinishedMessage
{
    Guid ActivationId { get; }
}

public static class MessageContracts
{
    static bool _initialized;

    public static void Initialize()
    {
        if (_initialized)
            return;

        GlobalTopology.Send.UseCorrelationId<StartProcessingMessage>(x => x.ActivationId);
        GlobalTopology.Send.UseCorrelationId<ProcessingFinishedMessage>(x => x.ActivationId);

        _initialized = true;
    }
}

消费者:

public class StartProcessingConsumer : IConsumer<StartProcessingMessage>
{
    readonly ILogger<StartProcessingConsumer> _Logger;

    private readonly int _DelaySeconds = 5;

    public StartProcessingConsumer(ILogger<StartProcessingConsumer> logger)
    {
        _Logger = logger;
    }

    public async Task Consume(ConsumeContext<StartProcessingMessage> context)
    {
        var activationId = context.Message.ActivationId;

        _Logger.LogInformation($"Received Scan: {activationId}");

        await Task.Delay(_DelaySeconds * 1000);

        _Logger.LogInformation($"Finish Scan: {activationId}");

        await context.Publish<ProcessingFinishedMessage>(new { ActivationId = activationId });
    }
}

public class ProcessingFinishedConsumer : IConsumer<ProcessingFinishedMessage>
{
    readonly ILogger<ProcessingFinishedConsumer> _Logger;

    public ProcessingFinishedConsumer(ILogger<ProcessingFinishedConsumer> logger)
    {
        _Logger = logger;
    }

    public async Task Consume(ConsumeContext<ProcessingFinishedMessage> context)
    {
        _Logger.LogInformation($"Finish {context.Message.ActivationId}");

        await Task.CompletedTask;
    }
}

状态机定义:

public class ArcStateMachine: MassTransitStateMachine<ArcProcess>
{
    static ArcStateMachine()
    {
        MessageContracts.Initialize();
    }

    public ArcStateMachine()
    {
        InstanceState(x => x.CurrentState);

        Initially(
            When(ProcessingStartedEvent)
            .Then(context =>
            {
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .TransitionTo(ProcessingStartedState));

        During(ProcessingStartedState,
            When(ProcessingFinishedEvent)
            .Then(context =>
            {
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .Finalize());
    }

    public State ProcessingStartedState { get; }
    public State ProcessingFinishedState { get; }

    public Event<StartProcessingMessage> ProcessingStartedEvent { get; }
    public Event<ProcessingFinishedMessage> ProcessingFinishedEvent { get; }
}

MassTransit 设置:

var rabbitHost = Configuration["RABBIT_MQ_HOST"];

        if (rabbitHost.IsNotEmpty())
        {
            services.AddMassTransit(cnf =>
            {
                var connectionString = Configuration["MONGO_DB_CONNECTION_STRING"];

                cnf.AddSagaStateMachine<ArcStateMachine, ArcProcess>()
                    .Endpoint(e => e.Name = BusConstants.SagaQueue)
                    .MongoDbRepository(connectionString, r =>
                    {
                        r.DatabaseName = "mongoRepo";
                        r.CollectionName = "WorkflowState";
                    });


                cnf.AddConsumer(typeof(StartProcessingConsumer));
                cnf.AddConsumer(typeof(ProcessingFinishedConsumer));

                cnf.UsingRabbitMq((context, cfg) =>
                {
                    cfg.Host(new Uri(rabbitHost), hst =>
                    {
                        hst.Username("guest");
                        hst.Password("guest");
                    });

                    cfg.ConfigureEndpoints(context);
                });
            });

            services.AddMassTransitHostedService();

            services.AddSwaggerGen(c =>
            {
                c.SwaggerDoc("v1", new OpenApiInfo { Title = "MyApp", Version = "v1" });
            });
        }

这个例子帮助我理解了 MassTrasit 的基本原理。