如何在 MassTransit saga 的初始状态下进行调度

How to Schedule in the Initial state of MassTransit saga

我创建了一个状态机 saga,它将接收多条消息,并且仅在给定的时间段过去后,我希望它继续工作。我认为使用公共交通工具的唯一方法是使用框架的调度功能。

传奇代码(为简洁起见缩短)如下:

public class CheckFeedSubmissionStateMachine : MassTransitStateMachine<CheckFeedSubmissionState>
{
    public State? WaitingForTimeoutExpiration { get; private set; }

    public State? FetchingSubmissionData { get; private set; }

    public Event<CheckFeedSubmissionCommand> CheckFeedSubmissionCommandReceived { get; private set; }

    public Event<FeedSubmissionListReceivedEvent> FeedSubmissionListReceived { get; private set; }

    public Event<FeedSubmissionListErrorReceivedEvent> FeedSubmissionListErrorReceived { get; private set; }

    public Event<FeedSubmissionResultReceivedEvent> FeedSubmissionResultReceived { get; private set; }

    public Event<FeedSubmissionResultErrorReceivedEvent> FeedSubmissionResultErrorReceived { get; private set; }

    public Schedule<CheckFeedSubmissionState, SchedulingCompletionTimeoutExpired> ScheduleCompletionTimeout { get; private set; }

    private readonly int _scheduleDelay;
    
    public CheckFeedSubmissionStateMachine(IOptions<SagasOptions> options)
    {
        _scheduleDelay = int.Parse(options.Value.CheckFeedSubmissionStateMachine["ScheduleDelay"]);

        Configure();
        BuildProcess();
    }

    private void Configure()
    {
        Event(
            () => CheckFeedSubmissionCommandReceived,
            e => e.CorrelateById(x => x.Message.PartnerGuid));
        Schedule(() => ScheduleCompletionTimeout, instance => instance.SchedulingCompletionTimeoutTokenId, s =>
        {
            s.Delay = TimeSpan.FromSeconds(_scheduleDelay);
            s.Received = r => r.CorrelateById(context => context.Message.CorrelationId);
        });
        InstanceState(state => state.CurrentState);
    }

    private void BuildProcess()
    {
        Initially(
            When(CheckFeedSubmissionCommandReceived)
                .Then(InitializeState)
                .Then(StoreSubmissionIds)
                .Schedule(ScheduleCompletionTimeout, ScheduleEvent)
                .TransitionTo(WaitingForTimeoutExpiration));
        
        During(WaitingForTimeoutExpiration,
            When(CheckFeedSubmissionCommandReceived)
                .Then(StoreSubmissionIds),
            When(ScheduleCompletionTimeout.Received)
                .Activity(QueueGetFeedSubmissionListRequest)
                .TransitionTo(FetchingSubmissionData));

        // the rest ommited for brevity
    }

    private void InitializeState(BehaviorContext<CheckFeedSubmissionState, CheckFeedSubmissionCommand> ctx) =>
        ctx.Instance.PartnerId = ctx.Data.PartnerId;

    private void StoreSubmissionIds(BehaviorContext<CheckFeedSubmissionState, CheckFeedSubmissionCommand> ctx)
    {
        ctx.Instance.SubmissionIdToStatusMap[ctx.Data.FeedSubmissionId] = FeedProcessingStatus.Submitted;
        ctx.Instance.SubmissionIdsToCorrelationIdsMap[ctx.Data.FeedSubmissionId] = ctx.Data.CorrelationId;
    }

    private Task<SchedulingCompletionTimeoutExpired> ScheduleEvent<TEvent>(
        ConsumeEventContext<CheckFeedSubmissionState, TEvent> ctx) where TEvent : class =>
        ctx.Init<SchedulingCompletionTimeoutExpired>(new { ctx.Instance.CorrelationId });

    private EventActivityBinder<CheckFeedSubmissionState, SchedulingCompletionTimeoutExpired> QueueGetFeedSubmissionListRequest(
        IStateMachineActivitySelector<CheckFeedSubmissionState, SchedulingCompletionTimeoutExpired> sel) =>
        sel.OfType<QueueGetFeedSubmissionListActivity>();
}

我为它创建的一个测试旨在检查两条已发布的消息是否都已保存在 saga 中,代码如下:

    [Fact]
    public async Task GivenCheckFeedSubmissionCommand_WhenAnotherCheckFeedSubmissionCommandIsReceived_ThenTheSagaStoresBothSubmissionIds()
    {
        var (harness, sagaHarness) = GetTestComponents();
        var partnerGuid = Guid.NewGuid();

        await harness.Start();

        try
        {
            await harness.Bus.Publish(GetInitiatingEvent("1", partnerGuid));
            await Consumption<CheckFeedSubmissionCommand>(harness, sagaHarness, 1);
            await harness.Bus.Publish(GetInitiatingEvent("2", partnerGuid));
            await Consumption<CheckFeedSubmissionCommand>(harness, sagaHarness, 2);
            
            var state = sagaHarness.Sagas.Contains(partnerGuid);

            state.CurrentState.Should().Be("WaitingForTimeoutExpiration");
            state.SubmissionIdsToCorrelationIdsMap.Should().ContainKeys("1", "2");
        }
        finally
        {
            await harness.Stop();
        }
    }

    private static (InMemoryTestHarness, IStateMachineSagaTestHarness<CheckFeedSubmissionState, CheckFeedSubmissionStateMachine>) GetTestComponents() =>
        TestHarnessFactory.Create<CheckFeedSubmissionState, CheckFeedSubmissionStateMachine>(
            sp => sp
                .AddSingleton(Options.Create(new SagasOptions
                {
                    CheckFeedSubmissionStateMachine = new Dictionary<string, string>
                    {
                        ["ScheduleDelay"] = "0"
                    }
                })));

    private static CheckFeedSubmissionCommand GetInitiatingEvent(string feedSubmissionId, Guid partnerGuid) =>
        new(Guid.NewGuid(), "1", partnerGuid, feedSubmissionId);

    private static async Task Consumption<TEvent>(
        InMemoryTestHarness harness,
        IStateMachineSagaTestHarness<CheckFeedSubmissionState, CheckFeedSubmissionStateMachine> sagaHarness,
        int expectedCount)
        where TEvent : class
    {
        if (expectedCount == 1)
        {
            var harnessConsumed = harness.Consumed.SelectAsync<TEvent>().Any();
            var sagaConsumed = sagaHarness.Consumed.SelectAsync<TEvent>().Any();

            await Task.WhenAll(harnessConsumed, sagaConsumed);
        }
        else
        {
            int harnessConsumedCount;
            int sagaConsumedCount;

            do
            {
                var harnessConsumedTask = harness.Consumed.SelectAsync<TEvent>().Count();
                var sagaConsumedTask = sagaHarness.Consumed.SelectAsync<TEvent>().Count();

                harnessConsumedCount = await harnessConsumedTask;
                sagaConsumedCount = await sagaConsumedTask;

                await Task.Delay(1000);
            } while (harnessConsumedCount < expectedCount && sagaConsumedCount < expectedCount);
        }
    }

问题是,当我在 Initially/When 阶段调用此行 .Schedule(ScheduleCompletionTimeout, ScheduleEvent) 时,它以某种方式干扰了状态切换,并且 saga 不会切换到下一个状态 - 它停留在 Initial无限期地状态。我通过检查测试中的状态变量和在 InitializeState 方法中设置断点来确认它 - 它被命中两次。当我删除执行调度的那条线时,测试通过了,尽管我不能那样做,因为我需要它。有帮助吗?

您可能没有为带有测试工具的总线配置调度程序。如果您为测试启用了日志记录,您会在日志中看到错误。

测试工具的总线配置应该允许您添加调度程序:

configurator.UseDelayedMessageScheduler();

测试工具上有一个配置事件,OnConfigureInMemoryBus 或类似的东西,您可以使用它来配置总线。