MassTransit Bus.Publish 呼叫 SendObserver

MassTransit Bus.Publish calls SendObserver

我注意到当我调用 Bus.Publish 时,我的 SendObserver 与我的 PublishObserver 一起被调用。在我最初的场景中,我使用观察者进行一些调试日志记录,我注意到当我调用 Publish 时,PublishObserver 和 SendObserver 都被调用了相同的消息。下面的示例代码重现了该场景:

public class YourMessage { public string Text { get; set; } }

public class SendObserver : ISendObserver {
    public Task PreSend<T>(SendContext<T> context) where T : class
    {
        return Task.CompletedTask;
    }

    public Task PostSend<T>(SendContext<T> context) where T : class
    {
        Console.Out.WriteLineAsync($"Message Sent, Id: {context.MessageId}");

        return Task.CompletedTask;
    }

    public Task SendFault<T>(SendContext<T> context, Exception exception) where T : class
    {
        return Task.CompletedTask;
    }
}

public class PublishObserver : IPublishObserver
{
    public Task PrePublish<T>(PublishContext<T> context) where T : class
    {
        return Task.CompletedTask;
    }

    public Task PostPublish<T>(PublishContext<T> context) where T : class
    {
        Console.Out.WriteLineAsync($"Message Published, Id: {context.MessageId}");

        return Task.CompletedTask;
    }

    public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class
    {
        return Task.CompletedTask;
    }
}
public class Program
{
    public static void Main()
    {
        var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
        {
            var host = sbc.Host(new Uri("rabbitmq://rabbitmq/PublishSendTest"), h =>
            {
                h.Username("guest");
                h.Password("guest");
            });

            sbc.ReceiveEndpoint(host, "test_queue", ep =>
            {
                ep.Handler<YourMessage>(context =>
                {
                    return Console.Out.WriteLineAsync($"Received: {context.Message.Text}");
                });
            });
        });

        bus.ConnectSendObserver(new SendObserver());
        bus.ConnectPublishObserver(new PublishObserver());

        bus.Start();

        bus.Publish(new YourMessage { Text = "Hi" });

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();

        bus.Stop();
    }
}

输出:

Press any key to exit
Message Sent, Id: ac4f0000-3051-1065-bbe5-08d6335c9e05
Message Published, Id: ac4f0000-3051-1065-bbe5-08d6335c9e05
Received: Hi

这是预期的行为吗?如果是这样,我该怎么做才能确定它实际上是否是创建消息的发布调用?

我用的是5.1.5版本

不一致的观察者问题应该在开发版本中得到解决,并且已经创建了一个测试来验证支持的传输上的行为。发布后,发送观察者只能在实际 Send 上调用,而发布观察者只能在实际 Publish.

上调用

感谢您提出这个问题,我不确定它是如何摆脱困境的。