MassTransit 传奇 运行 预取 > 1
MassTransit saga running with prefetch > 1
我有一个 MassTransit saga 状态机(源自 Automatonymous.MassTransitStateMachine),我正在尝试解决一个问题,该问题仅在我将端点配置 prefetchCount 设置为大于 1 的值时才会出现。
问题是 'StartupCompletedEvent' 已发布,然后在将 saga 状态持久保存到数据库之前立即进行处理。
状态机配置如下:
State(() => Initialising);
State(() => StartingUp);
State(() => GeneratingFiles);
Event(() => Requested, x => x.CorrelateById(ctx => ctx.Message.CorrelationId).SelectId(ctx => ctx.Message.CorrelationId));
Event(() => StartupCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => InitialisationCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => FileGenerationCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Initially(
When(Requested)
.ThenAsync(async ctx =>
{
Console.WriteLine("Starting up...");
await ctx.Publish(new StartupCompletedEvent() { CorrelationId = ctx.Instance.CorrelationId }));
Console.WriteLine("Done starting up...");
}
.TransitionTo(StartingUp)
);
During(StartingUp,
When(StartupCompleted)
.ThenAsync(InitialiseSagaInstanceData)
.TransitionTo(Initialising)
);
// snip...!
当我的 saga 收到请求的事件时会发生什么:
- Initially 块的 ThenAsync 处理程序被命中。此时,没有 saga 数据持久保存到 repo(如预期的那样)。
- StartupCompletedEvent 已发布到总线。这里也没有 saga 数据保存到 repo。
- Initially 声明的 ThenAsync 块完成。至此,saga数据终于持久化了。
- 没有其他事情发生。
此时队列中没有消息,StartupCompletedEvent丢失。但是,数据库中有一个saga实例。
我已经尝试启动并确定其他线程之一(因为我的预取> 1)已经拾取了事件,没有在数据库中找到任何具有 correlationId 的传奇,并丢弃了事件。因此事件在 saga 有机会被持久化之前被发布和处理。
如果我将以下内容添加到 Initially 处理程序中:
When(StartupCompleted)
.Then(ctx => Console.WriteLine("Got the startup completed event when there is no saga instance"))
然后我开始执行 Console.WriteLine。我对此的理解是事件已收到,但已路由到 Initially 处理程序,因为 correlationId 不存在任何传奇。如果我在此时设置一个断点并检查 saga 回购协议,则还没有 saga。
可能还值得一提的其他几点:
- 我将 saga 回购上下文设置为使用 IsolationLevel.Serializable
- 我正在使用 EntityFrameworkSagaRepository
- 预取计数设置为 1 时一切正常
- 我正在使用 Ninject 作为 DI,我的 SagaRepository 是线程范围的,所以我想预取计数允许的每个处理程序都有自己的 saga 存储库副本
- 如果我在一个单独的线程中发布 StartupCompletedEvent,之前有 1000 毫秒的休眠时间,那么一切正常。我认为这是因为 saga repo 已经完成了 saga 状态的持久化,所以当事件最终发布并被处理程序接收时,saga 状态将从 repo 中正确检索。
如果我遗漏了什么,请告诉我;我已经尝试提供我认为有价值的所有内容,但不会让这个问题太长...
我也有这个问题,我想 post Chris 的评论作为答案,以便人们可以找到它。
解决方案是启用发件箱,以便在 saga 持久化之前保留消息。
c.ReceiveEndpoint("queue", e =>
{
e.UseInMemoryOutbox();
// other endpoint configuration here
}
我有一个 MassTransit saga 状态机(源自 Automatonymous.MassTransitStateMachine),我正在尝试解决一个问题,该问题仅在我将端点配置 prefetchCount 设置为大于 1 的值时才会出现。
问题是 'StartupCompletedEvent' 已发布,然后在将 saga 状态持久保存到数据库之前立即进行处理。
状态机配置如下:
State(() => Initialising);
State(() => StartingUp);
State(() => GeneratingFiles);
Event(() => Requested, x => x.CorrelateById(ctx => ctx.Message.CorrelationId).SelectId(ctx => ctx.Message.CorrelationId));
Event(() => StartupCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => InitialisationCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => FileGenerationCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Initially(
When(Requested)
.ThenAsync(async ctx =>
{
Console.WriteLine("Starting up...");
await ctx.Publish(new StartupCompletedEvent() { CorrelationId = ctx.Instance.CorrelationId }));
Console.WriteLine("Done starting up...");
}
.TransitionTo(StartingUp)
);
During(StartingUp,
When(StartupCompleted)
.ThenAsync(InitialiseSagaInstanceData)
.TransitionTo(Initialising)
);
// snip...!
当我的 saga 收到请求的事件时会发生什么:
- Initially 块的 ThenAsync 处理程序被命中。此时,没有 saga 数据持久保存到 repo(如预期的那样)。
- StartupCompletedEvent 已发布到总线。这里也没有 saga 数据保存到 repo。
- Initially 声明的 ThenAsync 块完成。至此,saga数据终于持久化了。
- 没有其他事情发生。
此时队列中没有消息,StartupCompletedEvent丢失。但是,数据库中有一个saga实例。
我已经尝试启动并确定其他线程之一(因为我的预取> 1)已经拾取了事件,没有在数据库中找到任何具有 correlationId 的传奇,并丢弃了事件。因此事件在 saga 有机会被持久化之前被发布和处理。
如果我将以下内容添加到 Initially 处理程序中:
When(StartupCompleted)
.Then(ctx => Console.WriteLine("Got the startup completed event when there is no saga instance"))
然后我开始执行 Console.WriteLine。我对此的理解是事件已收到,但已路由到 Initially 处理程序,因为 correlationId 不存在任何传奇。如果我在此时设置一个断点并检查 saga 回购协议,则还没有 saga。
可能还值得一提的其他几点:
- 我将 saga 回购上下文设置为使用 IsolationLevel.Serializable
- 我正在使用 EntityFrameworkSagaRepository
- 预取计数设置为 1 时一切正常
- 我正在使用 Ninject 作为 DI,我的 SagaRepository 是线程范围的,所以我想预取计数允许的每个处理程序都有自己的 saga 存储库副本
- 如果我在一个单独的线程中发布 StartupCompletedEvent,之前有 1000 毫秒的休眠时间,那么一切正常。我认为这是因为 saga repo 已经完成了 saga 状态的持久化,所以当事件最终发布并被处理程序接收时,saga 状态将从 repo 中正确检索。
如果我遗漏了什么,请告诉我;我已经尝试提供我认为有价值的所有内容,但不会让这个问题太长...
我也有这个问题,我想 post Chris 的评论作为答案,以便人们可以找到它。
解决方案是启用发件箱,以便在 saga 持久化之前保留消息。
c.ReceiveEndpoint("queue", e =>
{
e.UseInMemoryOutbox();
// other endpoint configuration here
}