如何使用 ActiveMQ Artemis 获得具有预定事件的 MassTransit saga
How to get MassTransit saga with scheduled events working with ActiveMQ Artemis
我在使用 Docker 容器中的 ActiveMQ Artemis 2.19.0 运行 使 MassT运行sit (v 7.3.1) saga 与预定事件一起工作时遇到问题。一切正常,没有任何异常,但无论我指定什么延迟,事件的发布都不会延迟。
我已经尝试使用测试工具重现相同的用例,但这里一切正常。
已更新
我为 MassT运行sit 添加了调试日志记录,以便更好地了解时间线。
这是我的测试代码,一个使用测试工具的绿色测试,一个使用 ActiveMQ 的失败测试。
using Automatonymous;
using MassTransit;
using MassTransit.ActiveMqTransport;
using MassTransit.Saga.InMemoryRepository;
using MassTransit.Testing;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using NUnit.Framework;
using System;
using System.Threading.Tasks;
namespace Tests
{
public static class Config
{
public const int scheduleDelayMillis = 2000;
public const int testScheduleSleepFactor = 5;
}
public interface IInitialSagaEvent : CorrelatedBy<Guid> { }
public interface IScheduledSagaEvent : CorrelatedBy<Guid> { }
public class MySaga : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public Guid? ScheduledSagaEventTokenId { get; set; }
}
public class MyStateMachine : MassTransitStateMachine<MySaga>
{
public MyStateMachine()
{
InstanceState(instance => instance.CurrentState);
Schedule(() => ScheduledSagaEvent, instance => instance.ScheduledSagaEventTokenId, s =>
{
s.Delay = TimeSpan.FromMilliseconds(Config.scheduleDelayMillis);
s.Received = r => r.CorrelateById(context => context.Message.CorrelationId);
});
Initially(
When(InitialSagaEvent)
.Schedule(ScheduledSagaEvent, context => context.Init<IScheduledSagaEvent>(new { context.Instance.CorrelationId }))
.TransitionTo(AwaitingScheduledEvent)
);
During(AwaitingScheduledEvent,
When(ScheduledSagaEvent.Received)
.TransitionTo(ScheduledEventReceived)
);
}
public Event<IInitialSagaEvent> InitialSagaEvent { get; private set; }
public State AwaitingScheduledEvent { get; private set; }
public State ScheduledEventReceived { get; private set; }
public Schedule<MySaga, IScheduledSagaEvent> ScheduledSagaEvent { get; private set; }
}
public class SagaTests
{
[Test]
public async Task Should_Delay_Publish_Of_Scheduled_Event_Using_TestHarness()
{
var provider = new ServiceCollection()
.AddMassTransitInMemoryTestHarness(cfg =>
{
cfg.AddSagaStateMachine<MyStateMachine, MySaga>().InMemoryRepository();
cfg.AddSagaStateMachineTestHarness<MyStateMachine, MySaga>();
})
.AddLogging(configure =>
{
configure.AddFilter("MassTransit", LogLevel.Debug);
configure.AddSimpleConsole(options =>
{
options.UseUtcTimestamp = true;
options.TimestampFormat = "HH:mm:ss.fff ";
});
})
.BuildServiceProvider(true);
var harness = provider.GetRequiredService<InMemoryTestHarness>();
var logger = provider.GetRequiredService<ILogger<SagaTests>>();
harness.OnConfigureInMemoryBus += configurator =>
{
configurator.UseDelayedMessageScheduler();
};
await harness.Start();
try
{
var sagaId = NewId.NextGuid();
var sagaHarness = provider.GetRequiredService<IStateMachineSagaTestHarness<MySaga, MyStateMachine>>();
await harness.Bus.Publish<IInitialSagaEvent>(new { CorrelationId = sagaId });
// Wait a little bit to give MassTransit a chance to publish and consume the IInitialSagaEvent
// (Using Thread.Sleep to make this test use the same logic as the ActiveMq test)
await Task.Delay(Config.scheduleDelayMillis / Config.testScheduleSleepFactor);
var saga = sagaHarness.Created.Contains(sagaId);
Assert.That(saga, Is.Not.Null);
// Checks to verify that the IScheduledSagaEvent has been scheduled, but not published/consumed yet
Assert.That(saga.CurrentState, Is.EqualTo(nameof(MyStateMachine.AwaitingScheduledEvent)));
Assert.That(saga.ScheduledSagaEventTokenId, Is.Not.Null);
// Wait long enough for the delayed IScheduledSagaEvent to get published and consumed
await Task.Delay(Config.scheduleDelayMillis * Config.testScheduleSleepFactor);
saga = sagaHarness.Sagas.Contains(sagaId);
Assert.That(saga, Is.Not.Null);
// Checks to verify that the IScheduledSagaEvent has been published/consumed
Assert.That(saga.CurrentState, Is.EqualTo(nameof(MyStateMachine.ScheduledEventReceived)));
Assert.That(saga.ScheduledSagaEventTokenId, Is.Null);
}
finally
{
await harness.Stop();
await provider.DisposeAsync();
}
}
[Test]
public async Task Should_Delay_Publish_Of_Scheduled_Event_Using_ActiveMq()
{
var provider = new ServiceCollection()
.AddMassTransit(cfg =>
{
cfg.AddDelayedMessageScheduler();
cfg.UsingActiveMq((context, config) =>
{
config.Host("artemis", 61616, configureHost =>
{
configureHost.Username("admin");
configureHost.Password("admin");
});
config.EnableArtemisCompatibility();
config.UseDelayedMessageScheduler();
config.ConfigureEndpoints(context);
});
cfg.AddSagaStateMachine<MyStateMachine, MySaga>().InMemoryRepository();
})
.AddLogging(configure =>
{
configure.AddFilter("MassTransit", LogLevel.Debug);
configure.AddSimpleConsole(options =>
{
options.UseUtcTimestamp = true;
options.TimestampFormat = "HH:mm:ss.fff ";
});
})
.BuildServiceProvider(true);
var busControl = provider.GetRequiredService<IBusControl>();
await busControl.StartAsync();
try
{
var sagaId = NewId.NextGuid();
var sagaRepo = provider.GetRequiredService<IndexedSagaDictionary<MySaga>>();
await busControl.Publish<IInitialSagaEvent>(new { CorrelationId = sagaId });
// Must wait a little bit to give MassTransit a chance to publish and consume the IInitialSagaEvent using ActiveMq
await Task.Delay(Config.scheduleDelayMillis / Config.testScheduleSleepFactor);
Assert.That(sagaRepo.Count, Is.EqualTo(1));
var sagaInstance = sagaRepo[sagaId];
Assert.That(sagaInstance, Is.Not.Null);
var saga = sagaInstance.Instance;
Assert.That(saga, Is.Not.Null);
// Checks to verify that the IScheduledSagaEvent has been scheduled, but not published/consumed yet
Assert.That(saga.CurrentState, Is.EqualTo(nameof(MyStateMachine.AwaitingScheduledEvent))); // <-- Line 166 that fails
Assert.That(saga.ScheduledSagaEventTokenId, Is.Not.Null);
// Wait long enough for the delayed IScheduledSagaEvent to get published and consumed
await Task.Delay(Config.scheduleDelayMillis * Config.testScheduleSleepFactor);
sagaInstance = sagaRepo[sagaId];
Assert.That(sagaInstance, Is.Not.Null);
saga = sagaInstance.Instance;
Assert.That(saga, Is.Not.Null);
// Checks to verify that the IScheduledSagaEvent has been published/consumed
Assert.That(saga.CurrentState, Is.EqualTo(nameof(MyStateMachine.ScheduledEventReceived)));
Assert.That(saga.ScheduledSagaEventTokenId, Is.Null);
}
finally
{
await busControl.StopAsync();
await provider.DisposeAsync();
}
}
}
}
测试 Should_Delay_Publish_Of_Scheduled_Event_Using_ActiveMq
在第 166 行失败:
Assert.That(saga.CurrentState, Is.EqualTo(nameof(MyStateMachine.AwaitingScheduledEvent)));
这是我要验证预定事件是否尚未传送到 saga 的地方。
这是测试的日志
Message:
String lengths are both 22. Strings differ at index 0.
Expected: "AwaitingScheduledEvent"
But was: "ScheduledEventReceived"
-----------^
Stack Trace:
SagaTests.Should_Delay_Publish_Of_Scheduled_Event_Using_ActiveMq() line 168
SagaTests.Should_Delay_Publish_Of_Scheduled_Event_Using_ActiveMq() line 187
GenericAdapter`1.BlockUntilCompleted()
NoMessagePumpStrategy.WaitForCompletion(AwaitAdapter awaiter)
AsyncToSyncAdapter.Await(Func`1 invoke)
TestMethodCommand.RunTestMethod(TestExecutionContext context)
TestMethodCommand.Execute(TestExecutionContext context)
<>c__DisplayClass4_0.<PerformWork>b__0()
<>c__DisplayClass1_0`1.<DoIsolated>b__0(Object _)
Standard Output:
15:19:07.714 info: MassTransit[0]
Configured endpoint My, Saga: Tests.MySaga, State Machine: Tests.MyStateMachine
15:19:07.802 dbug: MassTransit[0]
Starting bus: activemq://artemis:61616/
15:19:07.823 dbug: MassTransit[0]
Connect: admin@artemis:61616
15:19:08.003 dbug: MassTransit[0]
Connected: admin@artemis:61616 (client-id: ID:LNOR012579-63177-637823531478654470-0:0, version: 1.8.0.0)
15:19:08.025 dbug: MassTransit[0]
Get topic name: VirtualTopic.Tests.IScheduledSagaEvent, durable
15:19:08.025 dbug: MassTransit[0]
Get topic name: VirtualTopic.Tests.IInitialSagaEvent, durable
15:19:08.026 dbug: MassTransit[0]
Endpoint Ready: activemq://artemis:61616/LNOR012579_testhost_bus_dfkyyygxedrfw57abdpynfz7nw?temporary=true
15:19:08.027 dbug: MassTransit[0]
Get queue name: My, durable
15:19:08.027 dbug: MassTransit[0]
Get queue name: VirtualTopic.Tests.IScheduledSagaEvent::Consumer.My.VirtualTopic.Tests.IScheduledSagaEvent, durable
15:19:08.027 dbug: MassTransit[0]
Get queue name: VirtualTopic.Tests.IInitialSagaEvent::Consumer.My.VirtualTopic.Tests.IInitialSagaEvent, durable
15:19:08.039 dbug: MassTransit[0]
Created consumer for activemq://artemis:61616/My: My?consumer.prefetchSize=24
15:19:08.041 dbug: MassTransit[0]
Created consumer for activemq://artemis:61616/My: VirtualTopic.Tests.IInitialSagaEvent::Consumer.My.VirtualTopic.Tests.IInitialSagaEvent?consumer.prefetchSize=24
15:19:08.042 dbug: MassTransit[0]
Created consumer for activemq://artemis:61616/My: VirtualTopic.Tests.IScheduledSagaEvent::Consumer.My.VirtualTopic.Tests.IScheduledSagaEvent?consumer.prefetchSize=24
15:19:08.044 dbug: MassTransit[0]
Consumers Ready: activemq://artemis:61616/My
15:19:08.044 dbug: MassTransit[0]
Endpoint Ready: activemq://artemis:61616/My
15:19:08.048 info: MassTransit[0]
Bus started: activemq://artemis:61616/
15:19:08.096 dbug: MassTransit[0]
Get topic name: VirtualTopic.Tests.IInitialSagaEvent, durable
15:19:08.158 dbug: MassTransit[0]
SEND activemq://artemis:61616/VirtualTopic.Tests.IInitialSagaEvent?type=topic 19540000-cf40-c85a-621f-08da0116fd8c Tests.IInitialSagaEvent
15:19:08.207 dbug: MassTransit.ReceiveTransport[0]
SAGA:Tests.MySaga:19540000-cf40-c85a-3b6b-08da0116fd83 Created Tests.IInitialSagaEvent
15:19:08.213 dbug: MassTransit.ReceiveTransport[0]
SAGA:Tests.MySaga:19540000-cf40-c85a-3b6b-08da0116fd83 Added Tests.IInitialSagaEvent
15:19:08.231 dbug: MassTransit.ReceiveTransport[0]
Create send transport: activemq://artemis:61616/My
15:19:08.233 dbug: MassTransit.ReceiveTransport[0]
Get queue name: My, durable
15:19:08.246 dbug: MassTransit.ReceiveTransport[0]
SEND activemq://artemis:61616/My 19540000-cf40-c85a-ac85-08da0116fd9f Tests.IScheduledSagaEvent
15:19:08.252 dbug: MassTransit.ReceiveTransport[0]
SAGA:Tests.MySaga:19540000-cf40-c85a-3b6b-08da0116fd83 Used Tests.IScheduledSagaEvent
15:19:08.253 dbug: MassTransit.ReceiveTransport[0]
RECEIVE activemq://artemis:61616/My 19540000-cf40-c85a-621f-08da0116fd8c Tests.IInitialSagaEvent Tests.MySaga(00:00:00.0573113)
15:19:08.256 dbug: MassTransit.ReceiveTransport[0]
RECEIVE activemq://artemis:61616/My 19540000-cf40-c85a-ac85-08da0116fd9f Tests.IScheduledSagaEvent Tests.MySaga(00:00:00.0045235)
15:19:08.610 dbug: MassTransit[0]
Stopping bus: activemq://artemis:61616/
15:19:08.615 dbug: MassTransit[0]
Endpoint Stopping: activemq://artemis:61616/My
15:19:08.615 dbug: MassTransit[0]
Stopping receive transport: activemq://artemis:61616/My
15:19:08.624 dbug: MassTransit[0]
Stopping send transport: My
15:19:08.625 dbug: MassTransit[0]
Endpoint Completed: activemq://artemis:61616/My
15:19:08.625 dbug: MassTransit[0]
Endpoint Stopping: activemq://artemis:61616/LNOR012579_testhost_bus_dfkyyygxedrfw57abdpynfz7nw?temporary=true
15:19:08.625 dbug: MassTransit[0]
Stopping receive transport: activemq://artemis:61616/LNOR012579_testhost_bus_dfkyyygxedrfw57abdpynfz7nw?temporary=true
15:19:08.625 dbug: MassTransit[0]
Consumer completed activemq://artemis:61616/My: 2 received, 1 concurrent
15:19:08.625 dbug: MassTransit[0]
Stopping send transport: VirtualTopic.Tests.IInitialSagaEvent
15:19:08.626 dbug: MassTransit[0]
Endpoint Completed: activemq://artemis:61616/LNOR012579_testhost_bus_dfkyyygxedrfw57abdpynfz7nw?temporary=true
15:19:08.638 dbug: MassTransit[0]
Disconnect: admin@artemis:61616
15:19:08.644 dbug: MassTransit[0]
Disconnected: admin@artemis:61616
15:19:08.645 info: MassTransit[0]
Bus stopped: activemq://artemis:61616/
从日志来看,消息似乎在发送后 10 毫秒就已传送到 saga?
15:19:08.246 dbug: MassTransit.ReceiveTransport[0]
SEND activemq://artemis:61616/My 19540000-cf40-c85a-ac85-08da0116fd9f Tests.IScheduledSagaEvent
15:19:08.252 dbug: MassTransit.ReceiveTransport[0]
SAGA:Tests.MySaga:19540000-cf40-c85a-3b6b-08da0116fd83 Used Tests.IScheduledSagaEvent
15:19:08.253 dbug: MassTransit.ReceiveTransport[0]
RECEIVE activemq://artemis:61616/My 19540000-cf40-c85a-621f-08da0116fd8c Tests.IInitialSagaEvent Tests.MySaga(00:00:00.0573113)
15:19:08.256 dbug: MassTransit.ReceiveTransport[0]
RECEIVE activemq://artemis:61616/My 19540000-cf40-c85a-ac85-08da0116fd9f Tests.IScheduledSagaEvent Tests.MySaga(00:00:00.0045235)
我想我可能在某处遗漏了一些配置部分,但是 运行 没有想法,所以非常感谢任何提示。
在幕后,MassTransit 使用 OpenWire 协议与 ActiveMQ Artemis 进行通信。目前 ActiveMQ Artemis 不支持此类 OpenWire 客户端使用的 AMQ_SCHEDULED_DELAY
header。我打开了ARTEMIS-3711 to deal with this and sent a pull request。这应该在 ActiveMQ Artemis 2.21.0 中得到修复,它应该在本月底进行投票。
我在使用 Docker 容器中的 ActiveMQ Artemis 2.19.0 运行 使 MassT运行sit (v 7.3.1) saga 与预定事件一起工作时遇到问题。一切正常,没有任何异常,但无论我指定什么延迟,事件的发布都不会延迟。
我已经尝试使用测试工具重现相同的用例,但这里一切正常。
已更新 我为 MassT运行sit 添加了调试日志记录,以便更好地了解时间线。
这是我的测试代码,一个使用测试工具的绿色测试,一个使用 ActiveMQ 的失败测试。
using Automatonymous;
using MassTransit;
using MassTransit.ActiveMqTransport;
using MassTransit.Saga.InMemoryRepository;
using MassTransit.Testing;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using NUnit.Framework;
using System;
using System.Threading.Tasks;
namespace Tests
{
public static class Config
{
public const int scheduleDelayMillis = 2000;
public const int testScheduleSleepFactor = 5;
}
public interface IInitialSagaEvent : CorrelatedBy<Guid> { }
public interface IScheduledSagaEvent : CorrelatedBy<Guid> { }
public class MySaga : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public Guid? ScheduledSagaEventTokenId { get; set; }
}
public class MyStateMachine : MassTransitStateMachine<MySaga>
{
public MyStateMachine()
{
InstanceState(instance => instance.CurrentState);
Schedule(() => ScheduledSagaEvent, instance => instance.ScheduledSagaEventTokenId, s =>
{
s.Delay = TimeSpan.FromMilliseconds(Config.scheduleDelayMillis);
s.Received = r => r.CorrelateById(context => context.Message.CorrelationId);
});
Initially(
When(InitialSagaEvent)
.Schedule(ScheduledSagaEvent, context => context.Init<IScheduledSagaEvent>(new { context.Instance.CorrelationId }))
.TransitionTo(AwaitingScheduledEvent)
);
During(AwaitingScheduledEvent,
When(ScheduledSagaEvent.Received)
.TransitionTo(ScheduledEventReceived)
);
}
public Event<IInitialSagaEvent> InitialSagaEvent { get; private set; }
public State AwaitingScheduledEvent { get; private set; }
public State ScheduledEventReceived { get; private set; }
public Schedule<MySaga, IScheduledSagaEvent> ScheduledSagaEvent { get; private set; }
}
public class SagaTests
{
[Test]
public async Task Should_Delay_Publish_Of_Scheduled_Event_Using_TestHarness()
{
var provider = new ServiceCollection()
.AddMassTransitInMemoryTestHarness(cfg =>
{
cfg.AddSagaStateMachine<MyStateMachine, MySaga>().InMemoryRepository();
cfg.AddSagaStateMachineTestHarness<MyStateMachine, MySaga>();
})
.AddLogging(configure =>
{
configure.AddFilter("MassTransit", LogLevel.Debug);
configure.AddSimpleConsole(options =>
{
options.UseUtcTimestamp = true;
options.TimestampFormat = "HH:mm:ss.fff ";
});
})
.BuildServiceProvider(true);
var harness = provider.GetRequiredService<InMemoryTestHarness>();
var logger = provider.GetRequiredService<ILogger<SagaTests>>();
harness.OnConfigureInMemoryBus += configurator =>
{
configurator.UseDelayedMessageScheduler();
};
await harness.Start();
try
{
var sagaId = NewId.NextGuid();
var sagaHarness = provider.GetRequiredService<IStateMachineSagaTestHarness<MySaga, MyStateMachine>>();
await harness.Bus.Publish<IInitialSagaEvent>(new { CorrelationId = sagaId });
// Wait a little bit to give MassTransit a chance to publish and consume the IInitialSagaEvent
// (Using Thread.Sleep to make this test use the same logic as the ActiveMq test)
await Task.Delay(Config.scheduleDelayMillis / Config.testScheduleSleepFactor);
var saga = sagaHarness.Created.Contains(sagaId);
Assert.That(saga, Is.Not.Null);
// Checks to verify that the IScheduledSagaEvent has been scheduled, but not published/consumed yet
Assert.That(saga.CurrentState, Is.EqualTo(nameof(MyStateMachine.AwaitingScheduledEvent)));
Assert.That(saga.ScheduledSagaEventTokenId, Is.Not.Null);
// Wait long enough for the delayed IScheduledSagaEvent to get published and consumed
await Task.Delay(Config.scheduleDelayMillis * Config.testScheduleSleepFactor);
saga = sagaHarness.Sagas.Contains(sagaId);
Assert.That(saga, Is.Not.Null);
// Checks to verify that the IScheduledSagaEvent has been published/consumed
Assert.That(saga.CurrentState, Is.EqualTo(nameof(MyStateMachine.ScheduledEventReceived)));
Assert.That(saga.ScheduledSagaEventTokenId, Is.Null);
}
finally
{
await harness.Stop();
await provider.DisposeAsync();
}
}
[Test]
public async Task Should_Delay_Publish_Of_Scheduled_Event_Using_ActiveMq()
{
var provider = new ServiceCollection()
.AddMassTransit(cfg =>
{
cfg.AddDelayedMessageScheduler();
cfg.UsingActiveMq((context, config) =>
{
config.Host("artemis", 61616, configureHost =>
{
configureHost.Username("admin");
configureHost.Password("admin");
});
config.EnableArtemisCompatibility();
config.UseDelayedMessageScheduler();
config.ConfigureEndpoints(context);
});
cfg.AddSagaStateMachine<MyStateMachine, MySaga>().InMemoryRepository();
})
.AddLogging(configure =>
{
configure.AddFilter("MassTransit", LogLevel.Debug);
configure.AddSimpleConsole(options =>
{
options.UseUtcTimestamp = true;
options.TimestampFormat = "HH:mm:ss.fff ";
});
})
.BuildServiceProvider(true);
var busControl = provider.GetRequiredService<IBusControl>();
await busControl.StartAsync();
try
{
var sagaId = NewId.NextGuid();
var sagaRepo = provider.GetRequiredService<IndexedSagaDictionary<MySaga>>();
await busControl.Publish<IInitialSagaEvent>(new { CorrelationId = sagaId });
// Must wait a little bit to give MassTransit a chance to publish and consume the IInitialSagaEvent using ActiveMq
await Task.Delay(Config.scheduleDelayMillis / Config.testScheduleSleepFactor);
Assert.That(sagaRepo.Count, Is.EqualTo(1));
var sagaInstance = sagaRepo[sagaId];
Assert.That(sagaInstance, Is.Not.Null);
var saga = sagaInstance.Instance;
Assert.That(saga, Is.Not.Null);
// Checks to verify that the IScheduledSagaEvent has been scheduled, but not published/consumed yet
Assert.That(saga.CurrentState, Is.EqualTo(nameof(MyStateMachine.AwaitingScheduledEvent))); // <-- Line 166 that fails
Assert.That(saga.ScheduledSagaEventTokenId, Is.Not.Null);
// Wait long enough for the delayed IScheduledSagaEvent to get published and consumed
await Task.Delay(Config.scheduleDelayMillis * Config.testScheduleSleepFactor);
sagaInstance = sagaRepo[sagaId];
Assert.That(sagaInstance, Is.Not.Null);
saga = sagaInstance.Instance;
Assert.That(saga, Is.Not.Null);
// Checks to verify that the IScheduledSagaEvent has been published/consumed
Assert.That(saga.CurrentState, Is.EqualTo(nameof(MyStateMachine.ScheduledEventReceived)));
Assert.That(saga.ScheduledSagaEventTokenId, Is.Null);
}
finally
{
await busControl.StopAsync();
await provider.DisposeAsync();
}
}
}
}
测试 Should_Delay_Publish_Of_Scheduled_Event_Using_ActiveMq
在第 166 行失败:
Assert.That(saga.CurrentState, Is.EqualTo(nameof(MyStateMachine.AwaitingScheduledEvent)));
这是我要验证预定事件是否尚未传送到 saga 的地方。
这是测试的日志
Message:
String lengths are both 22. Strings differ at index 0.
Expected: "AwaitingScheduledEvent"
But was: "ScheduledEventReceived"
-----------^
Stack Trace:
SagaTests.Should_Delay_Publish_Of_Scheduled_Event_Using_ActiveMq() line 168
SagaTests.Should_Delay_Publish_Of_Scheduled_Event_Using_ActiveMq() line 187
GenericAdapter`1.BlockUntilCompleted()
NoMessagePumpStrategy.WaitForCompletion(AwaitAdapter awaiter)
AsyncToSyncAdapter.Await(Func`1 invoke)
TestMethodCommand.RunTestMethod(TestExecutionContext context)
TestMethodCommand.Execute(TestExecutionContext context)
<>c__DisplayClass4_0.<PerformWork>b__0()
<>c__DisplayClass1_0`1.<DoIsolated>b__0(Object _)
Standard Output:
15:19:07.714 info: MassTransit[0]
Configured endpoint My, Saga: Tests.MySaga, State Machine: Tests.MyStateMachine
15:19:07.802 dbug: MassTransit[0]
Starting bus: activemq://artemis:61616/
15:19:07.823 dbug: MassTransit[0]
Connect: admin@artemis:61616
15:19:08.003 dbug: MassTransit[0]
Connected: admin@artemis:61616 (client-id: ID:LNOR012579-63177-637823531478654470-0:0, version: 1.8.0.0)
15:19:08.025 dbug: MassTransit[0]
Get topic name: VirtualTopic.Tests.IScheduledSagaEvent, durable
15:19:08.025 dbug: MassTransit[0]
Get topic name: VirtualTopic.Tests.IInitialSagaEvent, durable
15:19:08.026 dbug: MassTransit[0]
Endpoint Ready: activemq://artemis:61616/LNOR012579_testhost_bus_dfkyyygxedrfw57abdpynfz7nw?temporary=true
15:19:08.027 dbug: MassTransit[0]
Get queue name: My, durable
15:19:08.027 dbug: MassTransit[0]
Get queue name: VirtualTopic.Tests.IScheduledSagaEvent::Consumer.My.VirtualTopic.Tests.IScheduledSagaEvent, durable
15:19:08.027 dbug: MassTransit[0]
Get queue name: VirtualTopic.Tests.IInitialSagaEvent::Consumer.My.VirtualTopic.Tests.IInitialSagaEvent, durable
15:19:08.039 dbug: MassTransit[0]
Created consumer for activemq://artemis:61616/My: My?consumer.prefetchSize=24
15:19:08.041 dbug: MassTransit[0]
Created consumer for activemq://artemis:61616/My: VirtualTopic.Tests.IInitialSagaEvent::Consumer.My.VirtualTopic.Tests.IInitialSagaEvent?consumer.prefetchSize=24
15:19:08.042 dbug: MassTransit[0]
Created consumer for activemq://artemis:61616/My: VirtualTopic.Tests.IScheduledSagaEvent::Consumer.My.VirtualTopic.Tests.IScheduledSagaEvent?consumer.prefetchSize=24
15:19:08.044 dbug: MassTransit[0]
Consumers Ready: activemq://artemis:61616/My
15:19:08.044 dbug: MassTransit[0]
Endpoint Ready: activemq://artemis:61616/My
15:19:08.048 info: MassTransit[0]
Bus started: activemq://artemis:61616/
15:19:08.096 dbug: MassTransit[0]
Get topic name: VirtualTopic.Tests.IInitialSagaEvent, durable
15:19:08.158 dbug: MassTransit[0]
SEND activemq://artemis:61616/VirtualTopic.Tests.IInitialSagaEvent?type=topic 19540000-cf40-c85a-621f-08da0116fd8c Tests.IInitialSagaEvent
15:19:08.207 dbug: MassTransit.ReceiveTransport[0]
SAGA:Tests.MySaga:19540000-cf40-c85a-3b6b-08da0116fd83 Created Tests.IInitialSagaEvent
15:19:08.213 dbug: MassTransit.ReceiveTransport[0]
SAGA:Tests.MySaga:19540000-cf40-c85a-3b6b-08da0116fd83 Added Tests.IInitialSagaEvent
15:19:08.231 dbug: MassTransit.ReceiveTransport[0]
Create send transport: activemq://artemis:61616/My
15:19:08.233 dbug: MassTransit.ReceiveTransport[0]
Get queue name: My, durable
15:19:08.246 dbug: MassTransit.ReceiveTransport[0]
SEND activemq://artemis:61616/My 19540000-cf40-c85a-ac85-08da0116fd9f Tests.IScheduledSagaEvent
15:19:08.252 dbug: MassTransit.ReceiveTransport[0]
SAGA:Tests.MySaga:19540000-cf40-c85a-3b6b-08da0116fd83 Used Tests.IScheduledSagaEvent
15:19:08.253 dbug: MassTransit.ReceiveTransport[0]
RECEIVE activemq://artemis:61616/My 19540000-cf40-c85a-621f-08da0116fd8c Tests.IInitialSagaEvent Tests.MySaga(00:00:00.0573113)
15:19:08.256 dbug: MassTransit.ReceiveTransport[0]
RECEIVE activemq://artemis:61616/My 19540000-cf40-c85a-ac85-08da0116fd9f Tests.IScheduledSagaEvent Tests.MySaga(00:00:00.0045235)
15:19:08.610 dbug: MassTransit[0]
Stopping bus: activemq://artemis:61616/
15:19:08.615 dbug: MassTransit[0]
Endpoint Stopping: activemq://artemis:61616/My
15:19:08.615 dbug: MassTransit[0]
Stopping receive transport: activemq://artemis:61616/My
15:19:08.624 dbug: MassTransit[0]
Stopping send transport: My
15:19:08.625 dbug: MassTransit[0]
Endpoint Completed: activemq://artemis:61616/My
15:19:08.625 dbug: MassTransit[0]
Endpoint Stopping: activemq://artemis:61616/LNOR012579_testhost_bus_dfkyyygxedrfw57abdpynfz7nw?temporary=true
15:19:08.625 dbug: MassTransit[0]
Stopping receive transport: activemq://artemis:61616/LNOR012579_testhost_bus_dfkyyygxedrfw57abdpynfz7nw?temporary=true
15:19:08.625 dbug: MassTransit[0]
Consumer completed activemq://artemis:61616/My: 2 received, 1 concurrent
15:19:08.625 dbug: MassTransit[0]
Stopping send transport: VirtualTopic.Tests.IInitialSagaEvent
15:19:08.626 dbug: MassTransit[0]
Endpoint Completed: activemq://artemis:61616/LNOR012579_testhost_bus_dfkyyygxedrfw57abdpynfz7nw?temporary=true
15:19:08.638 dbug: MassTransit[0]
Disconnect: admin@artemis:61616
15:19:08.644 dbug: MassTransit[0]
Disconnected: admin@artemis:61616
15:19:08.645 info: MassTransit[0]
Bus stopped: activemq://artemis:61616/
从日志来看,消息似乎在发送后 10 毫秒就已传送到 saga?
15:19:08.246 dbug: MassTransit.ReceiveTransport[0]
SEND activemq://artemis:61616/My 19540000-cf40-c85a-ac85-08da0116fd9f Tests.IScheduledSagaEvent
15:19:08.252 dbug: MassTransit.ReceiveTransport[0]
SAGA:Tests.MySaga:19540000-cf40-c85a-3b6b-08da0116fd83 Used Tests.IScheduledSagaEvent
15:19:08.253 dbug: MassTransit.ReceiveTransport[0]
RECEIVE activemq://artemis:61616/My 19540000-cf40-c85a-621f-08da0116fd8c Tests.IInitialSagaEvent Tests.MySaga(00:00:00.0573113)
15:19:08.256 dbug: MassTransit.ReceiveTransport[0]
RECEIVE activemq://artemis:61616/My 19540000-cf40-c85a-ac85-08da0116fd9f Tests.IScheduledSagaEvent Tests.MySaga(00:00:00.0045235)
我想我可能在某处遗漏了一些配置部分,但是 运行 没有想法,所以非常感谢任何提示。
在幕后,MassTransit 使用 OpenWire 协议与 ActiveMQ Artemis 进行通信。目前 ActiveMQ Artemis 不支持此类 OpenWire 客户端使用的 AMQ_SCHEDULED_DELAY
header。我打开了ARTEMIS-3711 to deal with this and sent a pull request。这应该在 ActiveMQ Artemis 2.21.0 中得到修复,它应该在本月底进行投票。