如何在 Asp.Net 核心应用程序中配置 MassTransit Saga
How to configure MassTransit Saga in Asp.Net Core application
我正在尝试将简单的 MassTransit Saga 集成到 ASP.NET 核心应用程序中。
在 ConfigureServices
期间,我有:
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<ISagaRepository<Request>, InMemorySagaRepository<Request>>();
services.AddMassTransit(x =>
{
x.AddSagaStateMachine<RequestStateMachine, Request>();
x.AddBus(provider => Bus.Factory.CreateUsingInMemory(cfg =>
{
cfg.UseInMemoryOutbox();
cfg.ConfigureEndpoints(provider);
}));
});
}
稍后我发布消息:
var bus = context.RequestServices.GetService<IBusControl>();
await bus.Publish<IRequestCreated>(new
{
CorrelationId = Guid.NewGuid(),
ClientId = 1,
});
但它从未到达 Saga 实例。
我的传奇是这样的:
public class RequestStateMachine : MassTransitStateMachine<Request>
{
public RequestStateMachine()
{
InstanceState(x => x.CurrentState);
Event(
() => RequestCreated,
x => x.CorrelateById(context => context.Message.CorrelationId).SelectId(context => Guid.NewGuid()));
Initially(
When(RequestCreated)
.Then(context =>
{
Console.WriteLine($"Request received, id = {context.Instance.CorrelationId}");
context.Instance.RequestId = 10;
})
.TransitionTo(Active)
);
SetCompletedWhenFinalized();
}
public State Active { get; protected set; }
public Event<IRequestCreated> RequestCreated { get; protected set; }
}
public class Request : SagaStateMachineInstance
{
public string CurrentState { get; set; }
public Guid CorrelationId { get; set; }
public long RequestId { get; set; }
public Guid? ExpirationId { get; set; }
}
我想我做错了什么,但不知道是什么。
我不得不承认这有点令人困惑。我们在 Microsoft DI 包和 ASP.NET 核心集成包中都有 AddMassTransit
方法,它们做不同的事情。
AspNetCoreIntegration
包中的 AddMassTransit
还注册了启动和停止总线的服务。因此,这段代码可以解决您的问题:
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<ISagaRepository<Request>, InMemorySagaRepository<Request>>();
services.AddMassTransit(
provider =>
Bus.Factory.CreateUsingInMemory(cfg =>
{
cfg.UseInMemoryOutbox();
cfg.ConfigureEndpoints(provider);
},
x => x.AddSagaStateMachine<RequestStateMachine, Request>()
);
}
你使用的方法只是在容器中将总线注册为IBus
、IBusControl
、ISendEndpointProvider
和IPublishEndpointPervider
,但并没有考虑启动并停下公共汽车。我在代码示例中使用的方法还注册了主机服务并(可选)添加了健康检查。
我正在尝试将简单的 MassTransit Saga 集成到 ASP.NET 核心应用程序中。
在 ConfigureServices
期间,我有:
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<ISagaRepository<Request>, InMemorySagaRepository<Request>>();
services.AddMassTransit(x =>
{
x.AddSagaStateMachine<RequestStateMachine, Request>();
x.AddBus(provider => Bus.Factory.CreateUsingInMemory(cfg =>
{
cfg.UseInMemoryOutbox();
cfg.ConfigureEndpoints(provider);
}));
});
}
稍后我发布消息:
var bus = context.RequestServices.GetService<IBusControl>();
await bus.Publish<IRequestCreated>(new
{
CorrelationId = Guid.NewGuid(),
ClientId = 1,
});
但它从未到达 Saga 实例。
我的传奇是这样的:
public class RequestStateMachine : MassTransitStateMachine<Request>
{
public RequestStateMachine()
{
InstanceState(x => x.CurrentState);
Event(
() => RequestCreated,
x => x.CorrelateById(context => context.Message.CorrelationId).SelectId(context => Guid.NewGuid()));
Initially(
When(RequestCreated)
.Then(context =>
{
Console.WriteLine($"Request received, id = {context.Instance.CorrelationId}");
context.Instance.RequestId = 10;
})
.TransitionTo(Active)
);
SetCompletedWhenFinalized();
}
public State Active { get; protected set; }
public Event<IRequestCreated> RequestCreated { get; protected set; }
}
public class Request : SagaStateMachineInstance
{
public string CurrentState { get; set; }
public Guid CorrelationId { get; set; }
public long RequestId { get; set; }
public Guid? ExpirationId { get; set; }
}
我想我做错了什么,但不知道是什么。
我不得不承认这有点令人困惑。我们在 Microsoft DI 包和 ASP.NET 核心集成包中都有 AddMassTransit
方法,它们做不同的事情。
AspNetCoreIntegration
包中的 AddMassTransit
还注册了启动和停止总线的服务。因此,这段代码可以解决您的问题:
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<ISagaRepository<Request>, InMemorySagaRepository<Request>>();
services.AddMassTransit(
provider =>
Bus.Factory.CreateUsingInMemory(cfg =>
{
cfg.UseInMemoryOutbox();
cfg.ConfigureEndpoints(provider);
},
x => x.AddSagaStateMachine<RequestStateMachine, Request>()
);
}
你使用的方法只是在容器中将总线注册为IBus
、IBusControl
、ISendEndpointProvider
和IPublishEndpointPervider
,但并没有考虑启动并停下公共汽车。我在代码示例中使用的方法还注册了主机服务并(可选)添加了健康检查。