Masstransit UseInMemoryOutbox 不适用于 MultiBus Saga?

Masstransit UseInMemoryOutbox not working with MultiBus Saga?

我实现了自定义 activity 来支持多总线传奇。 但我发现了一个问题,即在状态传奇保存到 Redis 之前,我收到了来自 SecondBus 的已发布消息。 我认为 UseInMemoryOutbox() 应该处理这个或者我错了。

public class TestActivity : Activity<SagaState, IFirstBusRequest>
{
    private readonly ISecondBus _secondBus;

    public TestActivity(ISecondBus secondBus)
    {
        _secondBus = secondBus;
    }


    public async Task Execute(BehaviorContext<SagaState, IFirstBusRequest> context, Behavior<SagaState, IFirstBusRequest> next)
    {
        var endpoint = await _secondBus.GetSendEndpoint(new Uri($"queue:second-bus-request"));
        await endpoint.Send(new { }); // send immediately
    }      
}

正确,这是一个已知限制,因为另一个总线实例是完全独立的总线,不是接收总线上发件箱的一部分。

如果您需要涉及发件箱,请考虑在第一条总线上生成消息,并在该总线上设置一个消费者,该消费者仅将该消息转发到另一条总线。

public class ForwardMessageConsumer<T> :
    IConsumer<T>
    where T : class
{
    readonly ISecondBus _bus;

    public ForwardMessageConsumer(ISecondBus bus)
    {
        _bus = bus;
    }

    public async Task Consume(ConsumeContext<T> context)
    {
        var messagePipe = new ForwardMessagePipe<T>(context);

        await _bus.Publish(context.Message, messagePipe, context.CancellationToken);
    }
}

然后您可以只拥有一个带有这些消息转发器的端点:

x.AddConsumer<ForwardMessageConsumer<A>>()
    .Endpoint(e => e.Name = "second-bus-forwarder");
x.AddConsumer<ForwardMessageConsumer<B>>()
    .Endpoint(e => e.Name = "second-bus-forwarder");

然后,ConfigureEndpoints 会将这些消费者放在接收端点上,该端点会将这些消息转发到其他总线实例。

The endpoint configuration is to put multiple consumers on the same queue, to avoid having multiple queues that just forward messages. It's assumed that Publish would be used from the activity.