简化 Azure 服务总线中 MassTransit Saga 的设置
Simplify setup for MassTransit Saga in Azure Service Bus
我正在尝试弄清楚如何设置 MassTransit saga 并最大限度地降低 Azure 服务总线中设置的复杂性。
我想的流程是这样的:
- 有人发送了类似
CreateSaga
的命令。它将被发送到一个名为 my-saga-queue
的队列并被 saga 使用。
- 在此基础上,saga 将生成一些其他命令,例如
DoSomething
,这些命令会被其他地方的消费者获取(不是 saga 特定的)。
- 这些消费者将在完成后发布类似
SomethingIsDone
的事件。
- saga 应该使用这些事件并根据它们做出反应。
第 4 步是我在弄清楚要做什么时遇到了一些问题。 SomethingIsDone
事件不能直接发布到 my-saga-queue
我猜(因为它是一个队列),所以他们必须以事件主题的订阅结束。
但是 saga 实例是否可以从该订阅中消费,或者是否必须有一个从订阅到队列的转发?
一些背景信息:我们正在努力简化我们的 ASB 设置,希望这会增加我们的吞吐量(有一些问题)。我们试图摆脱的事情是订阅和队列之间的转发,因为订阅本身基本上就是队列)。
您可以手动配置初始队列以及订阅端点,使用相同的 saga 存储库在每个端点上配置 saga。这是更多的工作,并将总线配置与 saga 消耗的每个事件紧密耦合,但可以混合使用接收端点和订阅端点。
例如,如果您有一个 CartStateMachine
和一个 CartState
,您可以将每个事件配置为类似于:
cfg.SubscriptionEndpoint<CartItemAdded>("cart-state-machine", endpoint =>
{
endpoint.ConfigureSaga<CartState>(context);
});
cfg.SubscriptionEndpoint<CartItemRemoved>("cart-state-machine", endpoint =>
{
endpoint.ConfigureSaga<CartState>(context);
});
那么,初始命令的实际队列(接收端点)将是:
configurator.ReceiveEndpoint("cart-state", endpoint =>
{
endpoint.ConfigureConsumeTopology = false;
endpoint.Subscribe<CreateCart>("cart-state");
endpoint.ConfigureSaga<CartState>(context);
});
您仍然会像往常一样将 saga 状态机添加到容器中:
services.AddMassTransit(x =>
{
x.AddSagaStateMachine<CartStateMachine, CartState>()
.InMemoryRepository();
x.UsingAzureServiceBus((context, cfg) =>
{
// bus configuration
});
});
我正在尝试弄清楚如何设置 MassTransit saga 并最大限度地降低 Azure 服务总线中设置的复杂性。
我想的流程是这样的:
- 有人发送了类似
CreateSaga
的命令。它将被发送到一个名为my-saga-queue
的队列并被 saga 使用。 - 在此基础上,saga 将生成一些其他命令,例如
DoSomething
,这些命令会被其他地方的消费者获取(不是 saga 特定的)。 - 这些消费者将在完成后发布类似
SomethingIsDone
的事件。 - saga 应该使用这些事件并根据它们做出反应。
第 4 步是我在弄清楚要做什么时遇到了一些问题。 SomethingIsDone
事件不能直接发布到 my-saga-queue
我猜(因为它是一个队列),所以他们必须以事件主题的订阅结束。
但是 saga 实例是否可以从该订阅中消费,或者是否必须有一个从订阅到队列的转发?
一些背景信息:我们正在努力简化我们的 ASB 设置,希望这会增加我们的吞吐量(有一些问题)。我们试图摆脱的事情是订阅和队列之间的转发,因为订阅本身基本上就是队列)。
您可以手动配置初始队列以及订阅端点,使用相同的 saga 存储库在每个端点上配置 saga。这是更多的工作,并将总线配置与 saga 消耗的每个事件紧密耦合,但可以混合使用接收端点和订阅端点。
例如,如果您有一个 CartStateMachine
和一个 CartState
,您可以将每个事件配置为类似于:
cfg.SubscriptionEndpoint<CartItemAdded>("cart-state-machine", endpoint =>
{
endpoint.ConfigureSaga<CartState>(context);
});
cfg.SubscriptionEndpoint<CartItemRemoved>("cart-state-machine", endpoint =>
{
endpoint.ConfigureSaga<CartState>(context);
});
那么,初始命令的实际队列(接收端点)将是:
configurator.ReceiveEndpoint("cart-state", endpoint =>
{
endpoint.ConfigureConsumeTopology = false;
endpoint.Subscribe<CreateCart>("cart-state");
endpoint.ConfigureSaga<CartState>(context);
});
您仍然会像往常一样将 saga 状态机添加到容器中:
services.AddMassTransit(x =>
{
x.AddSagaStateMachine<CartStateMachine, CartState>()
.InMemoryRepository();
x.UsingAzureServiceBus((context, cfg) =>
{
// bus configuration
});
});