如何在 MassTransit saga 的初始状态下进行调度
How to Schedule in the Initial state of MassTransit saga
我创建了一个状态机 saga,它将接收多条消息,并且仅在给定的时间段过去后,我希望它继续工作。我认为使用公共交通工具的唯一方法是使用框架的调度功能。
传奇代码(为简洁起见缩短)如下:
public class CheckFeedSubmissionStateMachine : MassTransitStateMachine<CheckFeedSubmissionState>
{
public State? WaitingForTimeoutExpiration { get; private set; }
public State? FetchingSubmissionData { get; private set; }
public Event<CheckFeedSubmissionCommand> CheckFeedSubmissionCommandReceived { get; private set; }
public Event<FeedSubmissionListReceivedEvent> FeedSubmissionListReceived { get; private set; }
public Event<FeedSubmissionListErrorReceivedEvent> FeedSubmissionListErrorReceived { get; private set; }
public Event<FeedSubmissionResultReceivedEvent> FeedSubmissionResultReceived { get; private set; }
public Event<FeedSubmissionResultErrorReceivedEvent> FeedSubmissionResultErrorReceived { get; private set; }
public Schedule<CheckFeedSubmissionState, SchedulingCompletionTimeoutExpired> ScheduleCompletionTimeout { get; private set; }
private readonly int _scheduleDelay;
public CheckFeedSubmissionStateMachine(IOptions<SagasOptions> options)
{
_scheduleDelay = int.Parse(options.Value.CheckFeedSubmissionStateMachine["ScheduleDelay"]);
Configure();
BuildProcess();
}
private void Configure()
{
Event(
() => CheckFeedSubmissionCommandReceived,
e => e.CorrelateById(x => x.Message.PartnerGuid));
Schedule(() => ScheduleCompletionTimeout, instance => instance.SchedulingCompletionTimeoutTokenId, s =>
{
s.Delay = TimeSpan.FromSeconds(_scheduleDelay);
s.Received = r => r.CorrelateById(context => context.Message.CorrelationId);
});
InstanceState(state => state.CurrentState);
}
private void BuildProcess()
{
Initially(
When(CheckFeedSubmissionCommandReceived)
.Then(InitializeState)
.Then(StoreSubmissionIds)
.Schedule(ScheduleCompletionTimeout, ScheduleEvent)
.TransitionTo(WaitingForTimeoutExpiration));
During(WaitingForTimeoutExpiration,
When(CheckFeedSubmissionCommandReceived)
.Then(StoreSubmissionIds),
When(ScheduleCompletionTimeout.Received)
.Activity(QueueGetFeedSubmissionListRequest)
.TransitionTo(FetchingSubmissionData));
// the rest ommited for brevity
}
private void InitializeState(BehaviorContext<CheckFeedSubmissionState, CheckFeedSubmissionCommand> ctx) =>
ctx.Instance.PartnerId = ctx.Data.PartnerId;
private void StoreSubmissionIds(BehaviorContext<CheckFeedSubmissionState, CheckFeedSubmissionCommand> ctx)
{
ctx.Instance.SubmissionIdToStatusMap[ctx.Data.FeedSubmissionId] = FeedProcessingStatus.Submitted;
ctx.Instance.SubmissionIdsToCorrelationIdsMap[ctx.Data.FeedSubmissionId] = ctx.Data.CorrelationId;
}
private Task<SchedulingCompletionTimeoutExpired> ScheduleEvent<TEvent>(
ConsumeEventContext<CheckFeedSubmissionState, TEvent> ctx) where TEvent : class =>
ctx.Init<SchedulingCompletionTimeoutExpired>(new { ctx.Instance.CorrelationId });
private EventActivityBinder<CheckFeedSubmissionState, SchedulingCompletionTimeoutExpired> QueueGetFeedSubmissionListRequest(
IStateMachineActivitySelector<CheckFeedSubmissionState, SchedulingCompletionTimeoutExpired> sel) =>
sel.OfType<QueueGetFeedSubmissionListActivity>();
}
我为它创建的一个测试旨在检查两条已发布的消息是否都已保存在 saga 中,代码如下:
[Fact]
public async Task GivenCheckFeedSubmissionCommand_WhenAnotherCheckFeedSubmissionCommandIsReceived_ThenTheSagaStoresBothSubmissionIds()
{
var (harness, sagaHarness) = GetTestComponents();
var partnerGuid = Guid.NewGuid();
await harness.Start();
try
{
await harness.Bus.Publish(GetInitiatingEvent("1", partnerGuid));
await Consumption<CheckFeedSubmissionCommand>(harness, sagaHarness, 1);
await harness.Bus.Publish(GetInitiatingEvent("2", partnerGuid));
await Consumption<CheckFeedSubmissionCommand>(harness, sagaHarness, 2);
var state = sagaHarness.Sagas.Contains(partnerGuid);
state.CurrentState.Should().Be("WaitingForTimeoutExpiration");
state.SubmissionIdsToCorrelationIdsMap.Should().ContainKeys("1", "2");
}
finally
{
await harness.Stop();
}
}
private static (InMemoryTestHarness, IStateMachineSagaTestHarness<CheckFeedSubmissionState, CheckFeedSubmissionStateMachine>) GetTestComponents() =>
TestHarnessFactory.Create<CheckFeedSubmissionState, CheckFeedSubmissionStateMachine>(
sp => sp
.AddSingleton(Options.Create(new SagasOptions
{
CheckFeedSubmissionStateMachine = new Dictionary<string, string>
{
["ScheduleDelay"] = "0"
}
})));
private static CheckFeedSubmissionCommand GetInitiatingEvent(string feedSubmissionId, Guid partnerGuid) =>
new(Guid.NewGuid(), "1", partnerGuid, feedSubmissionId);
private static async Task Consumption<TEvent>(
InMemoryTestHarness harness,
IStateMachineSagaTestHarness<CheckFeedSubmissionState, CheckFeedSubmissionStateMachine> sagaHarness,
int expectedCount)
where TEvent : class
{
if (expectedCount == 1)
{
var harnessConsumed = harness.Consumed.SelectAsync<TEvent>().Any();
var sagaConsumed = sagaHarness.Consumed.SelectAsync<TEvent>().Any();
await Task.WhenAll(harnessConsumed, sagaConsumed);
}
else
{
int harnessConsumedCount;
int sagaConsumedCount;
do
{
var harnessConsumedTask = harness.Consumed.SelectAsync<TEvent>().Count();
var sagaConsumedTask = sagaHarness.Consumed.SelectAsync<TEvent>().Count();
harnessConsumedCount = await harnessConsumedTask;
sagaConsumedCount = await sagaConsumedTask;
await Task.Delay(1000);
} while (harnessConsumedCount < expectedCount && sagaConsumedCount < expectedCount);
}
}
问题是,当我在 Initially/When
阶段调用此行 .Schedule(ScheduleCompletionTimeout, ScheduleEvent)
时,它以某种方式干扰了状态切换,并且 saga 不会切换到下一个状态 - 它停留在 Initial无限期地状态。我通过检查测试中的状态变量和在 InitializeState
方法中设置断点来确认它 - 它被命中两次。当我删除执行调度的那条线时,测试通过了,尽管我不能那样做,因为我需要它。有帮助吗?
您可能没有为带有测试工具的总线配置调度程序。如果您为测试启用了日志记录,您会在日志中看到错误。
测试工具的总线配置应该允许您添加调度程序:
configurator.UseDelayedMessageScheduler();
测试工具上有一个配置事件,OnConfigureInMemoryBus
或类似的东西,您可以使用它来配置总线。
我创建了一个状态机 saga,它将接收多条消息,并且仅在给定的时间段过去后,我希望它继续工作。我认为使用公共交通工具的唯一方法是使用框架的调度功能。
传奇代码(为简洁起见缩短)如下:
public class CheckFeedSubmissionStateMachine : MassTransitStateMachine<CheckFeedSubmissionState>
{
public State? WaitingForTimeoutExpiration { get; private set; }
public State? FetchingSubmissionData { get; private set; }
public Event<CheckFeedSubmissionCommand> CheckFeedSubmissionCommandReceived { get; private set; }
public Event<FeedSubmissionListReceivedEvent> FeedSubmissionListReceived { get; private set; }
public Event<FeedSubmissionListErrorReceivedEvent> FeedSubmissionListErrorReceived { get; private set; }
public Event<FeedSubmissionResultReceivedEvent> FeedSubmissionResultReceived { get; private set; }
public Event<FeedSubmissionResultErrorReceivedEvent> FeedSubmissionResultErrorReceived { get; private set; }
public Schedule<CheckFeedSubmissionState, SchedulingCompletionTimeoutExpired> ScheduleCompletionTimeout { get; private set; }
private readonly int _scheduleDelay;
public CheckFeedSubmissionStateMachine(IOptions<SagasOptions> options)
{
_scheduleDelay = int.Parse(options.Value.CheckFeedSubmissionStateMachine["ScheduleDelay"]);
Configure();
BuildProcess();
}
private void Configure()
{
Event(
() => CheckFeedSubmissionCommandReceived,
e => e.CorrelateById(x => x.Message.PartnerGuid));
Schedule(() => ScheduleCompletionTimeout, instance => instance.SchedulingCompletionTimeoutTokenId, s =>
{
s.Delay = TimeSpan.FromSeconds(_scheduleDelay);
s.Received = r => r.CorrelateById(context => context.Message.CorrelationId);
});
InstanceState(state => state.CurrentState);
}
private void BuildProcess()
{
Initially(
When(CheckFeedSubmissionCommandReceived)
.Then(InitializeState)
.Then(StoreSubmissionIds)
.Schedule(ScheduleCompletionTimeout, ScheduleEvent)
.TransitionTo(WaitingForTimeoutExpiration));
During(WaitingForTimeoutExpiration,
When(CheckFeedSubmissionCommandReceived)
.Then(StoreSubmissionIds),
When(ScheduleCompletionTimeout.Received)
.Activity(QueueGetFeedSubmissionListRequest)
.TransitionTo(FetchingSubmissionData));
// the rest ommited for brevity
}
private void InitializeState(BehaviorContext<CheckFeedSubmissionState, CheckFeedSubmissionCommand> ctx) =>
ctx.Instance.PartnerId = ctx.Data.PartnerId;
private void StoreSubmissionIds(BehaviorContext<CheckFeedSubmissionState, CheckFeedSubmissionCommand> ctx)
{
ctx.Instance.SubmissionIdToStatusMap[ctx.Data.FeedSubmissionId] = FeedProcessingStatus.Submitted;
ctx.Instance.SubmissionIdsToCorrelationIdsMap[ctx.Data.FeedSubmissionId] = ctx.Data.CorrelationId;
}
private Task<SchedulingCompletionTimeoutExpired> ScheduleEvent<TEvent>(
ConsumeEventContext<CheckFeedSubmissionState, TEvent> ctx) where TEvent : class =>
ctx.Init<SchedulingCompletionTimeoutExpired>(new { ctx.Instance.CorrelationId });
private EventActivityBinder<CheckFeedSubmissionState, SchedulingCompletionTimeoutExpired> QueueGetFeedSubmissionListRequest(
IStateMachineActivitySelector<CheckFeedSubmissionState, SchedulingCompletionTimeoutExpired> sel) =>
sel.OfType<QueueGetFeedSubmissionListActivity>();
}
我为它创建的一个测试旨在检查两条已发布的消息是否都已保存在 saga 中,代码如下:
[Fact]
public async Task GivenCheckFeedSubmissionCommand_WhenAnotherCheckFeedSubmissionCommandIsReceived_ThenTheSagaStoresBothSubmissionIds()
{
var (harness, sagaHarness) = GetTestComponents();
var partnerGuid = Guid.NewGuid();
await harness.Start();
try
{
await harness.Bus.Publish(GetInitiatingEvent("1", partnerGuid));
await Consumption<CheckFeedSubmissionCommand>(harness, sagaHarness, 1);
await harness.Bus.Publish(GetInitiatingEvent("2", partnerGuid));
await Consumption<CheckFeedSubmissionCommand>(harness, sagaHarness, 2);
var state = sagaHarness.Sagas.Contains(partnerGuid);
state.CurrentState.Should().Be("WaitingForTimeoutExpiration");
state.SubmissionIdsToCorrelationIdsMap.Should().ContainKeys("1", "2");
}
finally
{
await harness.Stop();
}
}
private static (InMemoryTestHarness, IStateMachineSagaTestHarness<CheckFeedSubmissionState, CheckFeedSubmissionStateMachine>) GetTestComponents() =>
TestHarnessFactory.Create<CheckFeedSubmissionState, CheckFeedSubmissionStateMachine>(
sp => sp
.AddSingleton(Options.Create(new SagasOptions
{
CheckFeedSubmissionStateMachine = new Dictionary<string, string>
{
["ScheduleDelay"] = "0"
}
})));
private static CheckFeedSubmissionCommand GetInitiatingEvent(string feedSubmissionId, Guid partnerGuid) =>
new(Guid.NewGuid(), "1", partnerGuid, feedSubmissionId);
private static async Task Consumption<TEvent>(
InMemoryTestHarness harness,
IStateMachineSagaTestHarness<CheckFeedSubmissionState, CheckFeedSubmissionStateMachine> sagaHarness,
int expectedCount)
where TEvent : class
{
if (expectedCount == 1)
{
var harnessConsumed = harness.Consumed.SelectAsync<TEvent>().Any();
var sagaConsumed = sagaHarness.Consumed.SelectAsync<TEvent>().Any();
await Task.WhenAll(harnessConsumed, sagaConsumed);
}
else
{
int harnessConsumedCount;
int sagaConsumedCount;
do
{
var harnessConsumedTask = harness.Consumed.SelectAsync<TEvent>().Count();
var sagaConsumedTask = sagaHarness.Consumed.SelectAsync<TEvent>().Count();
harnessConsumedCount = await harnessConsumedTask;
sagaConsumedCount = await sagaConsumedTask;
await Task.Delay(1000);
} while (harnessConsumedCount < expectedCount && sagaConsumedCount < expectedCount);
}
}
问题是,当我在 Initially/When
阶段调用此行 .Schedule(ScheduleCompletionTimeout, ScheduleEvent)
时,它以某种方式干扰了状态切换,并且 saga 不会切换到下一个状态 - 它停留在 Initial无限期地状态。我通过检查测试中的状态变量和在 InitializeState
方法中设置断点来确认它 - 它被命中两次。当我删除执行调度的那条线时,测试通过了,尽管我不能那样做,因为我需要它。有帮助吗?
您可能没有为带有测试工具的总线配置调度程序。如果您为测试启用了日志记录,您会在日志中看到错误。
测试工具的总线配置应该允许您添加调度程序:
configurator.UseDelayedMessageScheduler();
测试工具上有一个配置事件,OnConfigureInMemoryBus
或类似的东西,您可以使用它来配置总线。