MassTransit Bus.Publish 呼叫 SendObserver
MassTransit Bus.Publish calls SendObserver
我注意到当我调用 Bus.Publish 时,我的 SendObserver 与我的 PublishObserver 一起被调用。在我最初的场景中,我使用观察者进行一些调试日志记录,我注意到当我调用 Publish 时,PublishObserver 和 SendObserver 都被调用了相同的消息。下面的示例代码重现了该场景:
public class YourMessage { public string Text { get; set; } }
public class SendObserver : ISendObserver {
public Task PreSend<T>(SendContext<T> context) where T : class
{
return Task.CompletedTask;
}
public Task PostSend<T>(SendContext<T> context) where T : class
{
Console.Out.WriteLineAsync($"Message Sent, Id: {context.MessageId}");
return Task.CompletedTask;
}
public Task SendFault<T>(SendContext<T> context, Exception exception) where T : class
{
return Task.CompletedTask;
}
}
public class PublishObserver : IPublishObserver
{
public Task PrePublish<T>(PublishContext<T> context) where T : class
{
return Task.CompletedTask;
}
public Task PostPublish<T>(PublishContext<T> context) where T : class
{
Console.Out.WriteLineAsync($"Message Published, Id: {context.MessageId}");
return Task.CompletedTask;
}
public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class
{
return Task.CompletedTask;
}
}
public class Program
{
public static void Main()
{
var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
{
var host = sbc.Host(new Uri("rabbitmq://rabbitmq/PublishSendTest"), h =>
{
h.Username("guest");
h.Password("guest");
});
sbc.ReceiveEndpoint(host, "test_queue", ep =>
{
ep.Handler<YourMessage>(context =>
{
return Console.Out.WriteLineAsync($"Received: {context.Message.Text}");
});
});
});
bus.ConnectSendObserver(new SendObserver());
bus.ConnectPublishObserver(new PublishObserver());
bus.Start();
bus.Publish(new YourMessage { Text = "Hi" });
Console.WriteLine("Press any key to exit");
Console.ReadKey();
bus.Stop();
}
}
输出:
Press any key to exit
Message Sent, Id: ac4f0000-3051-1065-bbe5-08d6335c9e05
Message Published, Id: ac4f0000-3051-1065-bbe5-08d6335c9e05
Received: Hi
这是预期的行为吗?如果是这样,我该怎么做才能确定它实际上是否是创建消息的发布调用?
我用的是5.1.5版本
不一致的观察者问题应该在开发版本中得到解决,并且已经创建了一个测试来验证支持的传输上的行为。发布后,发送观察者只能在实际 Send
上调用,而发布观察者只能在实际 Publish
.
上调用
感谢您提出这个问题,我不确定它是如何摆脱困境的。
我注意到当我调用 Bus.Publish 时,我的 SendObserver 与我的 PublishObserver 一起被调用。在我最初的场景中,我使用观察者进行一些调试日志记录,我注意到当我调用 Publish 时,PublishObserver 和 SendObserver 都被调用了相同的消息。下面的示例代码重现了该场景:
public class YourMessage { public string Text { get; set; } }
public class SendObserver : ISendObserver {
public Task PreSend<T>(SendContext<T> context) where T : class
{
return Task.CompletedTask;
}
public Task PostSend<T>(SendContext<T> context) where T : class
{
Console.Out.WriteLineAsync($"Message Sent, Id: {context.MessageId}");
return Task.CompletedTask;
}
public Task SendFault<T>(SendContext<T> context, Exception exception) where T : class
{
return Task.CompletedTask;
}
}
public class PublishObserver : IPublishObserver
{
public Task PrePublish<T>(PublishContext<T> context) where T : class
{
return Task.CompletedTask;
}
public Task PostPublish<T>(PublishContext<T> context) where T : class
{
Console.Out.WriteLineAsync($"Message Published, Id: {context.MessageId}");
return Task.CompletedTask;
}
public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class
{
return Task.CompletedTask;
}
}
public class Program
{
public static void Main()
{
var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
{
var host = sbc.Host(new Uri("rabbitmq://rabbitmq/PublishSendTest"), h =>
{
h.Username("guest");
h.Password("guest");
});
sbc.ReceiveEndpoint(host, "test_queue", ep =>
{
ep.Handler<YourMessage>(context =>
{
return Console.Out.WriteLineAsync($"Received: {context.Message.Text}");
});
});
});
bus.ConnectSendObserver(new SendObserver());
bus.ConnectPublishObserver(new PublishObserver());
bus.Start();
bus.Publish(new YourMessage { Text = "Hi" });
Console.WriteLine("Press any key to exit");
Console.ReadKey();
bus.Stop();
}
}
输出:
Press any key to exit
Message Sent, Id: ac4f0000-3051-1065-bbe5-08d6335c9e05
Message Published, Id: ac4f0000-3051-1065-bbe5-08d6335c9e05
Received: Hi
这是预期的行为吗?如果是这样,我该怎么做才能确定它实际上是否是创建消息的发布调用?
我用的是5.1.5版本
不一致的观察者问题应该在开发版本中得到解决,并且已经创建了一个测试来验证支持的传输上的行为。发布后,发送观察者只能在实际 Send
上调用,而发布观察者只能在实际 Publish
.
感谢您提出这个问题,我不确定它是如何摆脱困境的。