MassTransit Automatonymous - 发送消息时状态不变
MassTransit Automatonymous - State not changing when a message is Sent
我想弄清楚为什么“发送”一条消息不会调用状态机,但如果我“发布”一条消息,它会起作用,我可以看到状态发生变化。
以下是我的代码,它与文档类似,只是我尝试“发送”一条消息。
组件
状态机:
public class OrderState: SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public int CurrentState { get; set; }
public DateTime? OrderDate { get; set; }
}
public class OrderStateMachine: MassTransitStateMachine<OrderState>
{
public State Submitted { get; private set; }
public State Accepted { get; private set; }
public State Completed { get; private set; }
public Event<SubmitOrder> SubmitOrder { get; private set; }
public Event<OrderAccepted> OrderAccepted { get; private set; }
public Event<OrderCompleted> OrderCompleted { get; private set; }
public OrderStateMachine()
{
InstanceState(x => x.CurrentState, Submitted, Accepted, Completed);
Event(() => SubmitOrder, x => x.CorrelateById(context => context.Message.OrderId));
Event(() => OrderAccepted, x => x.CorrelateById(context => context.Message.OrderId));
Event(() => OrderCompleted, x => x.CorrelateById(context => context.Message.OrderId));
Initially(
When(SubmitOrder)
.Then(context => context.Instance.OrderDate = context.Data.OrderDate)
.TransitionTo(Submitted));
During(Submitted,
When(OrderAccepted)
.TransitionTo(Accepted));
During(Accepted,
Ignore(SubmitOrder));
DuringAny(
When(OrderCompleted)
.TransitionTo(Completed));
SetCompleted(async instance =>
{
var currentState = await this.GetState(instance);
return Completed.Equals(currentState);
});
}
}
合同:
public record SubmitOrder(Guid OrderId, DateTime? OrderDate);
public record OrderAccepted(Guid OrderId);
public record OrderCompleted(Guid OrderId);
消费者:
public class SubmitOrderConsumer: IConsumer<SubmitOrder>
{
public async Task Consume(ConsumeContext<SubmitOrder> context)
{
await Task.Delay(2000);
}
}
public class SubmitOrderConsumerDefinition : ConsumerDefinition<SubmitOrderConsumer>
{
public SubmitOrderConsumerDefinition()
{
EndpointName = "submit-order";
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<SubmitOrderConsumer> consumerConfigurator)
{
endpointConfigurator.ConfigureConsumeTopology = false;
}
}
网络API
Program.cs(片段)
// Add services to the container.
builder.Services.AddMassTransit(cfg =>
{
cfg.SetKebabCaseEndpointNameFormatter();
cfg.UsingRabbitMq((context, configurator) =>
{
configurator.Host("localhost", "/", hostConfigurator =>
{
hostConfigurator.Username("guest");
hostConfigurator.Password("guest");
});
});
});
builder.Services.AddMassTransitHostedService();
builder.Services.AddControllers();
订单控制器
[Route("order")]
public class OrderController : ControllerBase
{
private readonly ISendEndpointProvider _sendEndpointProvider;
public OrderController(ISendEndpointProvider sendEndpointProvider)
{
_sendEndpointProvider = sendEndpointProvider;
}
[HttpPost]
public async Task<IActionResult> SendOrder()
{
var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri("exchange:submit-order"));
await endpoint.Send(new SubmitOrder(Guid.NewGuid(), DateTime.Now));
return Ok();
}
}
工人服务
Program.cs
using IHost = Microsoft.Extensions.Hosting.IHost;
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.AddMassTransit(cfg =>
{
cfg.AddConsumer<SubmitOrderConsumer>(typeof(SubmitOrderConsumerDefinition));
cfg.AddSagaStateMachine<OrderStateMachine, OrderState>().InMemoryRepository();
cfg.UsingRabbitMq((context, rabbitMqConfigurator) =>
{
rabbitMqConfigurator.Host("localhost", "/", hostConfigurator =>
{
hostConfigurator.Username("guest");
hostConfigurator.Password("guest");
});
rabbitMqConfigurator.ReceiveEndpoint("saga-order", endpointConfigurator =>
{
endpointConfigurator.ConfigureSaga<OrderState>(context);
});
rabbitMqConfigurator.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
services.AddHostedService<Worker>();
})
.Build();
await host.RunAsync();
然后我通过 Postman 执行 POST 到:http://localhost:5000/order
它确实调用了 SubmitOrderConsumer,但由于某种原因,状态机没有被调用(它不会在将订单日期设置在初始状态的 Then 处理程序中遇到断点。).我想我缺少将两者联系在一起的东西。
非常感谢任何反馈。谢谢。
在您的示例中,您希望使用 Publish
,尤其是在您有两个消费者(消费者和状态机)在单独的端点(队列)上消耗信息。直接发送到交换只会将消息发送到端点之一。
我想弄清楚为什么“发送”一条消息不会调用状态机,但如果我“发布”一条消息,它会起作用,我可以看到状态发生变化。
以下是我的代码,它与文档类似,只是我尝试“发送”一条消息。
组件
状态机:
public class OrderState: SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public int CurrentState { get; set; }
public DateTime? OrderDate { get; set; }
}
public class OrderStateMachine: MassTransitStateMachine<OrderState>
{
public State Submitted { get; private set; }
public State Accepted { get; private set; }
public State Completed { get; private set; }
public Event<SubmitOrder> SubmitOrder { get; private set; }
public Event<OrderAccepted> OrderAccepted { get; private set; }
public Event<OrderCompleted> OrderCompleted { get; private set; }
public OrderStateMachine()
{
InstanceState(x => x.CurrentState, Submitted, Accepted, Completed);
Event(() => SubmitOrder, x => x.CorrelateById(context => context.Message.OrderId));
Event(() => OrderAccepted, x => x.CorrelateById(context => context.Message.OrderId));
Event(() => OrderCompleted, x => x.CorrelateById(context => context.Message.OrderId));
Initially(
When(SubmitOrder)
.Then(context => context.Instance.OrderDate = context.Data.OrderDate)
.TransitionTo(Submitted));
During(Submitted,
When(OrderAccepted)
.TransitionTo(Accepted));
During(Accepted,
Ignore(SubmitOrder));
DuringAny(
When(OrderCompleted)
.TransitionTo(Completed));
SetCompleted(async instance =>
{
var currentState = await this.GetState(instance);
return Completed.Equals(currentState);
});
}
}
合同:
public record SubmitOrder(Guid OrderId, DateTime? OrderDate);
public record OrderAccepted(Guid OrderId);
public record OrderCompleted(Guid OrderId);
消费者:
public class SubmitOrderConsumer: IConsumer<SubmitOrder>
{
public async Task Consume(ConsumeContext<SubmitOrder> context)
{
await Task.Delay(2000);
}
}
public class SubmitOrderConsumerDefinition : ConsumerDefinition<SubmitOrderConsumer>
{
public SubmitOrderConsumerDefinition()
{
EndpointName = "submit-order";
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<SubmitOrderConsumer> consumerConfigurator)
{
endpointConfigurator.ConfigureConsumeTopology = false;
}
}
网络API
Program.cs(片段)
// Add services to the container.
builder.Services.AddMassTransit(cfg =>
{
cfg.SetKebabCaseEndpointNameFormatter();
cfg.UsingRabbitMq((context, configurator) =>
{
configurator.Host("localhost", "/", hostConfigurator =>
{
hostConfigurator.Username("guest");
hostConfigurator.Password("guest");
});
});
});
builder.Services.AddMassTransitHostedService();
builder.Services.AddControllers();
订单控制器
[Route("order")]
public class OrderController : ControllerBase
{
private readonly ISendEndpointProvider _sendEndpointProvider;
public OrderController(ISendEndpointProvider sendEndpointProvider)
{
_sendEndpointProvider = sendEndpointProvider;
}
[HttpPost]
public async Task<IActionResult> SendOrder()
{
var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri("exchange:submit-order"));
await endpoint.Send(new SubmitOrder(Guid.NewGuid(), DateTime.Now));
return Ok();
}
}
工人服务
Program.cs
using IHost = Microsoft.Extensions.Hosting.IHost;
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.AddMassTransit(cfg =>
{
cfg.AddConsumer<SubmitOrderConsumer>(typeof(SubmitOrderConsumerDefinition));
cfg.AddSagaStateMachine<OrderStateMachine, OrderState>().InMemoryRepository();
cfg.UsingRabbitMq((context, rabbitMqConfigurator) =>
{
rabbitMqConfigurator.Host("localhost", "/", hostConfigurator =>
{
hostConfigurator.Username("guest");
hostConfigurator.Password("guest");
});
rabbitMqConfigurator.ReceiveEndpoint("saga-order", endpointConfigurator =>
{
endpointConfigurator.ConfigureSaga<OrderState>(context);
});
rabbitMqConfigurator.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
services.AddHostedService<Worker>();
})
.Build();
await host.RunAsync();
然后我通过 Postman 执行 POST 到:http://localhost:5000/order
它确实调用了 SubmitOrderConsumer,但由于某种原因,状态机没有被调用(它不会在将订单日期设置在初始状态的 Then 处理程序中遇到断点。).我想我缺少将两者联系在一起的东西。 非常感谢任何反馈。谢谢。
在您的示例中,您希望使用 Publish
,尤其是在您有两个消费者(消费者和状态机)在单独的端点(队列)上消耗信息。直接发送到交换只会将消息发送到端点之一。