如何使用 ActiveMQ Artemis 获得具有预定事件的 MassTransit saga

How to get MassTransit saga with scheduled events working with ActiveMQ Artemis

我在使用 Docker 容器中的 ActiveMQ Artemis 2.19.0 运行 使 MassT运行sit (v 7.3.1) saga 与预定事件一起工作时遇到问题。一切正常,没有任何异常,但无论我指定什么延迟,事件的发布都不会延迟。

我已经尝试使用测试工具重现相同的用例,但这里一切正常。

已更新 我为 MassT运行sit 添加了调试日志记录,以便更好地了解时间线。

这是我的测试代码,一个使用测试工具的绿色测试,一个使用 ActiveMQ 的失败测试。

using Automatonymous;
using MassTransit;
using MassTransit.ActiveMqTransport;
using MassTransit.Saga.InMemoryRepository;
using MassTransit.Testing;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using NUnit.Framework;
using System;
using System.Threading.Tasks;

namespace Tests
{
    public static class Config
    {
        public const int scheduleDelayMillis = 2000;
        public const int testScheduleSleepFactor = 5;
    }

    public interface IInitialSagaEvent : CorrelatedBy<Guid> { }
    public interface IScheduledSagaEvent : CorrelatedBy<Guid> { }

    public class MySaga : SagaStateMachineInstance
    {
        public Guid CorrelationId { get; set; }
        public string CurrentState { get; set; }
        public Guid? ScheduledSagaEventTokenId { get; set; }
    }

    public class MyStateMachine : MassTransitStateMachine<MySaga>
    {
        public MyStateMachine()
        {
            InstanceState(instance => instance.CurrentState);
            Schedule(() => ScheduledSagaEvent, instance => instance.ScheduledSagaEventTokenId, s =>
            {
                s.Delay = TimeSpan.FromMilliseconds(Config.scheduleDelayMillis);
                s.Received = r => r.CorrelateById(context => context.Message.CorrelationId);
            });
            Initially(
                When(InitialSagaEvent)
                    .Schedule(ScheduledSagaEvent, context => context.Init<IScheduledSagaEvent>(new { context.Instance.CorrelationId }))
                    .TransitionTo(AwaitingScheduledEvent)
            );
            During(AwaitingScheduledEvent,
                When(ScheduledSagaEvent.Received)
                    .TransitionTo(ScheduledEventReceived)
            );
        }
        public Event<IInitialSagaEvent> InitialSagaEvent { get; private set; }
        public State AwaitingScheduledEvent { get; private set; }
        public State ScheduledEventReceived { get; private set; }
        public Schedule<MySaga, IScheduledSagaEvent> ScheduledSagaEvent { get; private set; }
    }

    public class SagaTests
    {
        [Test]
        public async Task Should_Delay_Publish_Of_Scheduled_Event_Using_TestHarness()
        {
            var provider = new ServiceCollection()
                .AddMassTransitInMemoryTestHarness(cfg =>
                {
                    cfg.AddSagaStateMachine<MyStateMachine, MySaga>().InMemoryRepository();
                    cfg.AddSagaStateMachineTestHarness<MyStateMachine, MySaga>();
                })
                .AddLogging(configure =>
                {
                    configure.AddFilter("MassTransit", LogLevel.Debug);
                    configure.AddSimpleConsole(options =>
                    {
                        options.UseUtcTimestamp = true;
                        options.TimestampFormat = "HH:mm:ss.fff ";
                    });
                })
                .BuildServiceProvider(true);
            var harness = provider.GetRequiredService<InMemoryTestHarness>();
            var logger = provider.GetRequiredService<ILogger<SagaTests>>();
            harness.OnConfigureInMemoryBus += configurator =>
            {
                configurator.UseDelayedMessageScheduler();
            };
            await harness.Start();
            try
            {
                var sagaId = NewId.NextGuid();
                var sagaHarness = provider.GetRequiredService<IStateMachineSagaTestHarness<MySaga, MyStateMachine>>();

                await harness.Bus.Publish<IInitialSagaEvent>(new { CorrelationId = sagaId });
                // Wait a little bit to give MassTransit a chance to publish and consume the IInitialSagaEvent
                // (Using Thread.Sleep to make this test use the same logic as the ActiveMq test)
                await Task.Delay(Config.scheduleDelayMillis / Config.testScheduleSleepFactor);

                var saga = sagaHarness.Created.Contains(sagaId);
                Assert.That(saga, Is.Not.Null);

                // Checks to verify that the IScheduledSagaEvent has been scheduled, but not published/consumed yet
                Assert.That(saga.CurrentState, Is.EqualTo(nameof(MyStateMachine.AwaitingScheduledEvent)));
                Assert.That(saga.ScheduledSagaEventTokenId, Is.Not.Null);

                // Wait long enough for the delayed IScheduledSagaEvent to get published and consumed
                await Task.Delay(Config.scheduleDelayMillis * Config.testScheduleSleepFactor);

                saga = sagaHarness.Sagas.Contains(sagaId);
                Assert.That(saga, Is.Not.Null);

                // Checks to verify that the IScheduledSagaEvent has been published/consumed
                Assert.That(saga.CurrentState, Is.EqualTo(nameof(MyStateMachine.ScheduledEventReceived)));
                Assert.That(saga.ScheduledSagaEventTokenId, Is.Null);
            }
            finally
            {
                await harness.Stop();
                await provider.DisposeAsync();
            }
        }

        [Test]
        public async Task Should_Delay_Publish_Of_Scheduled_Event_Using_ActiveMq()
        {
            var provider = new ServiceCollection()
                .AddMassTransit(cfg =>
                {
                    cfg.AddDelayedMessageScheduler();
                    cfg.UsingActiveMq((context, config) =>
                    {
                        config.Host("artemis", 61616, configureHost =>
                        {
                            configureHost.Username("admin");
                            configureHost.Password("admin");
                        });
                        config.EnableArtemisCompatibility();
                        config.UseDelayedMessageScheduler();
                        config.ConfigureEndpoints(context);
                    });
                    cfg.AddSagaStateMachine<MyStateMachine, MySaga>().InMemoryRepository();
                })
                .AddLogging(configure =>
                {
                    configure.AddFilter("MassTransit", LogLevel.Debug);
                    configure.AddSimpleConsole(options =>
                    {
                        options.UseUtcTimestamp = true;
                        options.TimestampFormat = "HH:mm:ss.fff ";
                    });
                })
                .BuildServiceProvider(true);
            var busControl = provider.GetRequiredService<IBusControl>();
            await busControl.StartAsync();
            try
            {
                var sagaId = NewId.NextGuid();
                var sagaRepo = provider.GetRequiredService<IndexedSagaDictionary<MySaga>>();

                await busControl.Publish<IInitialSagaEvent>(new { CorrelationId = sagaId });
                // Must wait a little bit to give MassTransit a chance to publish and consume the IInitialSagaEvent using ActiveMq
                await Task.Delay(Config.scheduleDelayMillis / Config.testScheduleSleepFactor);

                Assert.That(sagaRepo.Count, Is.EqualTo(1));
                var sagaInstance = sagaRepo[sagaId];
                Assert.That(sagaInstance, Is.Not.Null);
                var saga = sagaInstance.Instance;
                Assert.That(saga, Is.Not.Null);

                // Checks to verify that the IScheduledSagaEvent has been scheduled, but not published/consumed yet
                Assert.That(saga.CurrentState, Is.EqualTo(nameof(MyStateMachine.AwaitingScheduledEvent))); // <-- Line 166 that fails
                Assert.That(saga.ScheduledSagaEventTokenId, Is.Not.Null);

                // Wait long enough for the delayed IScheduledSagaEvent to get published and consumed
                await Task.Delay(Config.scheduleDelayMillis * Config.testScheduleSleepFactor);

                sagaInstance = sagaRepo[sagaId];
                Assert.That(sagaInstance, Is.Not.Null);
                saga = sagaInstance.Instance;
                Assert.That(saga, Is.Not.Null);

                // Checks to verify that the IScheduledSagaEvent has been published/consumed
                Assert.That(saga.CurrentState, Is.EqualTo(nameof(MyStateMachine.ScheduledEventReceived)));
                Assert.That(saga.ScheduledSagaEventTokenId, Is.Null);
            }
            finally
            {
                await busControl.StopAsync();
                await provider.DisposeAsync();
            }
        }
    }
}

测试 Should_Delay_Publish_Of_Scheduled_Event_Using_ActiveMq 在第 166 行失败:

Assert.That(saga.CurrentState, Is.EqualTo(nameof(MyStateMachine.AwaitingScheduledEvent)));

这是我要验证预定事件是否尚未传送到 saga 的地方。

这是测试的日志

Message: 
  String lengths are both 22. Strings differ at index 0.
  Expected: "AwaitingScheduledEvent"
  But was:  "ScheduledEventReceived"
  -----------^


  Stack Trace: 
SagaTests.Should_Delay_Publish_Of_Scheduled_Event_Using_ActiveMq() line 168
SagaTests.Should_Delay_Publish_Of_Scheduled_Event_Using_ActiveMq() line 187
GenericAdapter`1.BlockUntilCompleted()
NoMessagePumpStrategy.WaitForCompletion(AwaitAdapter awaiter)
AsyncToSyncAdapter.Await(Func`1 invoke)
TestMethodCommand.RunTestMethod(TestExecutionContext context)
TestMethodCommand.Execute(TestExecutionContext context)
<>c__DisplayClass4_0.<PerformWork>b__0()
<>c__DisplayClass1_0`1.<DoIsolated>b__0(Object _)

  Standard Output: 
15:19:07.714 info: MassTransit[0]
      Configured endpoint My, Saga: Tests.MySaga, State Machine: Tests.MyStateMachine
15:19:07.802 dbug: MassTransit[0]
      Starting bus: activemq://artemis:61616/
15:19:07.823 dbug: MassTransit[0]
      Connect: admin@artemis:61616
15:19:08.003 dbug: MassTransit[0]
      Connected: admin@artemis:61616 (client-id: ID:LNOR012579-63177-637823531478654470-0:0, version: 1.8.0.0)
15:19:08.025 dbug: MassTransit[0]
      Get topic name: VirtualTopic.Tests.IScheduledSagaEvent, durable
15:19:08.025 dbug: MassTransit[0]
      Get topic name: VirtualTopic.Tests.IInitialSagaEvent, durable
15:19:08.026 dbug: MassTransit[0]
      Endpoint Ready: activemq://artemis:61616/LNOR012579_testhost_bus_dfkyyygxedrfw57abdpynfz7nw?temporary=true
15:19:08.027 dbug: MassTransit[0]
      Get queue name: My, durable
15:19:08.027 dbug: MassTransit[0]
      Get queue name: VirtualTopic.Tests.IScheduledSagaEvent::Consumer.My.VirtualTopic.Tests.IScheduledSagaEvent, durable
15:19:08.027 dbug: MassTransit[0]
      Get queue name: VirtualTopic.Tests.IInitialSagaEvent::Consumer.My.VirtualTopic.Tests.IInitialSagaEvent, durable
15:19:08.039 dbug: MassTransit[0]
      Created consumer for activemq://artemis:61616/My: My?consumer.prefetchSize=24
15:19:08.041 dbug: MassTransit[0]
      Created consumer for activemq://artemis:61616/My: VirtualTopic.Tests.IInitialSagaEvent::Consumer.My.VirtualTopic.Tests.IInitialSagaEvent?consumer.prefetchSize=24
15:19:08.042 dbug: MassTransit[0]
      Created consumer for activemq://artemis:61616/My: VirtualTopic.Tests.IScheduledSagaEvent::Consumer.My.VirtualTopic.Tests.IScheduledSagaEvent?consumer.prefetchSize=24
15:19:08.044 dbug: MassTransit[0]
      Consumers Ready: activemq://artemis:61616/My
15:19:08.044 dbug: MassTransit[0]
      Endpoint Ready: activemq://artemis:61616/My
15:19:08.048 info: MassTransit[0]
      Bus started: activemq://artemis:61616/
15:19:08.096 dbug: MassTransit[0]
      Get topic name: VirtualTopic.Tests.IInitialSagaEvent, durable
15:19:08.158 dbug: MassTransit[0]
      SEND activemq://artemis:61616/VirtualTopic.Tests.IInitialSagaEvent?type=topic 19540000-cf40-c85a-621f-08da0116fd8c Tests.IInitialSagaEvent
15:19:08.207 dbug: MassTransit.ReceiveTransport[0]
      SAGA:Tests.MySaga:19540000-cf40-c85a-3b6b-08da0116fd83 Created Tests.IInitialSagaEvent
15:19:08.213 dbug: MassTransit.ReceiveTransport[0]
      SAGA:Tests.MySaga:19540000-cf40-c85a-3b6b-08da0116fd83 Added Tests.IInitialSagaEvent
15:19:08.231 dbug: MassTransit.ReceiveTransport[0]
      Create send transport: activemq://artemis:61616/My
15:19:08.233 dbug: MassTransit.ReceiveTransport[0]
      Get queue name: My, durable
15:19:08.246 dbug: MassTransit.ReceiveTransport[0]
      SEND activemq://artemis:61616/My 19540000-cf40-c85a-ac85-08da0116fd9f Tests.IScheduledSagaEvent
15:19:08.252 dbug: MassTransit.ReceiveTransport[0]
      SAGA:Tests.MySaga:19540000-cf40-c85a-3b6b-08da0116fd83 Used Tests.IScheduledSagaEvent
15:19:08.253 dbug: MassTransit.ReceiveTransport[0]
      RECEIVE activemq://artemis:61616/My 19540000-cf40-c85a-621f-08da0116fd8c Tests.IInitialSagaEvent Tests.MySaga(00:00:00.0573113)
15:19:08.256 dbug: MassTransit.ReceiveTransport[0]
      RECEIVE activemq://artemis:61616/My 19540000-cf40-c85a-ac85-08da0116fd9f Tests.IScheduledSagaEvent Tests.MySaga(00:00:00.0045235)
15:19:08.610 dbug: MassTransit[0]
      Stopping bus: activemq://artemis:61616/
15:19:08.615 dbug: MassTransit[0]
      Endpoint Stopping: activemq://artemis:61616/My
15:19:08.615 dbug: MassTransit[0]
      Stopping receive transport: activemq://artemis:61616/My
15:19:08.624 dbug: MassTransit[0]
      Stopping send transport: My
15:19:08.625 dbug: MassTransit[0]
      Endpoint Completed: activemq://artemis:61616/My
15:19:08.625 dbug: MassTransit[0]
      Endpoint Stopping: activemq://artemis:61616/LNOR012579_testhost_bus_dfkyyygxedrfw57abdpynfz7nw?temporary=true
15:19:08.625 dbug: MassTransit[0]
      Stopping receive transport: activemq://artemis:61616/LNOR012579_testhost_bus_dfkyyygxedrfw57abdpynfz7nw?temporary=true
15:19:08.625 dbug: MassTransit[0]
      Consumer completed activemq://artemis:61616/My: 2 received, 1 concurrent
15:19:08.625 dbug: MassTransit[0]
      Stopping send transport: VirtualTopic.Tests.IInitialSagaEvent
15:19:08.626 dbug: MassTransit[0]
      Endpoint Completed: activemq://artemis:61616/LNOR012579_testhost_bus_dfkyyygxedrfw57abdpynfz7nw?temporary=true
15:19:08.638 dbug: MassTransit[0]
      Disconnect: admin@artemis:61616
15:19:08.644 dbug: MassTransit[0]
      Disconnected: admin@artemis:61616
15:19:08.645 info: MassTransit[0]
      Bus stopped: activemq://artemis:61616/

从日志来看,消息似乎在发送后 10 毫秒就已传送到 saga?

15:19:08.246 dbug: MassTransit.ReceiveTransport[0]
      SEND activemq://artemis:61616/My 19540000-cf40-c85a-ac85-08da0116fd9f Tests.IScheduledSagaEvent
15:19:08.252 dbug: MassTransit.ReceiveTransport[0]
      SAGA:Tests.MySaga:19540000-cf40-c85a-3b6b-08da0116fd83 Used Tests.IScheduledSagaEvent
15:19:08.253 dbug: MassTransit.ReceiveTransport[0]
      RECEIVE activemq://artemis:61616/My 19540000-cf40-c85a-621f-08da0116fd8c Tests.IInitialSagaEvent Tests.MySaga(00:00:00.0573113)
15:19:08.256 dbug: MassTransit.ReceiveTransport[0]
      RECEIVE activemq://artemis:61616/My 19540000-cf40-c85a-ac85-08da0116fd9f Tests.IScheduledSagaEvent Tests.MySaga(00:00:00.0045235)

我想我可能在某处遗漏了一些配置部分,但是 运行 没有想法,所以非常感谢任何提示。

在幕后,MassTransit 使用 OpenWire 协议与 ActiveMQ Artemis 进行通信。目前 ActiveMQ Artemis 不支持此类 OpenWire 客户端使用的 AMQ_SCHEDULED_DELAY header。我打开了ARTEMIS-3711 to deal with this and sent a pull request。这应该在 ActiveMQ Artemis 2.21.0 中得到修复,它应该在本月底进行投票。