MassTransit Automatonymous - 发送消息时状态不变

MassTransit Automatonymous - State not changing when a message is Sent

我想弄清楚为什么“发送”一条消息不会调用状态机,但如果我“发布”一条消息,它会起作用,我可以看到状态发生变化。

以下是我的代码,它与文档类似,只是我尝试“发送”一条消息。

组件

状态机:
public class OrderState: SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public int CurrentState { get; set; }
    public DateTime? OrderDate { get; set; }
}

public class OrderStateMachine: MassTransitStateMachine<OrderState>
{
    public State Submitted { get; private set; }
    public State Accepted { get; private set; }
    public State Completed { get; private set; }

    public Event<SubmitOrder> SubmitOrder { get; private set; }
    public Event<OrderAccepted> OrderAccepted { get; private set; }
    public Event<OrderCompleted> OrderCompleted { get; private set; }

    public OrderStateMachine()
    {
        InstanceState(x => x.CurrentState, Submitted, Accepted, Completed);
        Event(() => SubmitOrder, x => x.CorrelateById(context => context.Message.OrderId));
        Event(() => OrderAccepted, x => x.CorrelateById(context => context.Message.OrderId));
        Event(() => OrderCompleted, x => x.CorrelateById(context => context.Message.OrderId));

        Initially(
            When(SubmitOrder)
                .Then(context => context.Instance.OrderDate = context.Data.OrderDate)
                .TransitionTo(Submitted));

        During(Submitted,
            When(OrderAccepted)
                .TransitionTo(Accepted));
        
        During(Accepted,
            Ignore(SubmitOrder));

        DuringAny(
            When(OrderCompleted)
                .TransitionTo(Completed));
        
        SetCompleted(async instance =>
        {
            var currentState = await this.GetState(instance);
            return Completed.Equals(currentState);
        });
    }
}
合同:
public record SubmitOrder(Guid OrderId, DateTime? OrderDate);
public record OrderAccepted(Guid OrderId);
public record OrderCompleted(Guid OrderId);
消费者:
public class SubmitOrderConsumer: IConsumer<SubmitOrder>
{
    public async Task Consume(ConsumeContext<SubmitOrder> context)
    {
        await Task.Delay(2000);
    }
}

public class SubmitOrderConsumerDefinition : ConsumerDefinition<SubmitOrderConsumer>
{
    public SubmitOrderConsumerDefinition()
    {
        EndpointName = "submit-order";
    }
    
    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<SubmitOrderConsumer> consumerConfigurator)
    {
        endpointConfigurator.ConfigureConsumeTopology = false;
    }
}

网络API

Program.cs(片段)
// Add services to the container.
builder.Services.AddMassTransit(cfg =>
{
    cfg.SetKebabCaseEndpointNameFormatter();
    cfg.UsingRabbitMq((context, configurator) =>
    {
        configurator.Host("localhost", "/", hostConfigurator =>
        {
            hostConfigurator.Username("guest");
            hostConfigurator.Password("guest");
        });
    });
});
builder.Services.AddMassTransitHostedService();
builder.Services.AddControllers();

订单控制器
[Route("order")]
public class OrderController : ControllerBase
{
    private readonly ISendEndpointProvider _sendEndpointProvider;
    public OrderController(ISendEndpointProvider sendEndpointProvider)
    {
        _sendEndpointProvider = sendEndpointProvider;
    }
    
    [HttpPost]
    public async Task<IActionResult> SendOrder()
    {
        var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri("exchange:submit-order"));
        await endpoint.Send(new SubmitOrder(Guid.NewGuid(), DateTime.Now));
        return Ok();
    }
}

工人服务

Program.cs
using IHost = Microsoft.Extensions.Hosting.IHost;

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
        services.AddMassTransit(cfg =>
        {
            cfg.AddConsumer<SubmitOrderConsumer>(typeof(SubmitOrderConsumerDefinition));
            cfg.AddSagaStateMachine<OrderStateMachine, OrderState>().InMemoryRepository();
            cfg.UsingRabbitMq((context, rabbitMqConfigurator) =>
            {
                rabbitMqConfigurator.Host("localhost", "/", hostConfigurator =>
                {
                    hostConfigurator.Username("guest");
                    hostConfigurator.Password("guest");
                });
                rabbitMqConfigurator.ReceiveEndpoint("saga-order", endpointConfigurator =>
                {
                    endpointConfigurator.ConfigureSaga<OrderState>(context);
                });
                rabbitMqConfigurator.ConfigureEndpoints(context);
            });
        });
        services.AddMassTransitHostedService();
        services.AddHostedService<Worker>();
    })
    .Build();

await host.RunAsync();

然后我通过 Postman 执行 POST 到:http://localhost:5000/order

它确实调用了 SubmitOrderConsumer,但由于某种原因,状态机没有被调用(它不会在将订单日期设置在初始状态的 Then 处理程序中遇到断点。).我想我缺少将两者联系在一起的东西。 非常感谢任何反馈。谢谢。

在您的示例中,您希望使用 Publish,尤其是在您有两个消费者(消费者和状态机)在单独的端点(队列)上消耗信息。直接发送到交换只会将消息发送到端点之一。