在运行时为 Rebus 生成消息处理程序
Generating message handlers for Rebus in runtime
我遇到了一个问题,我想使用 Rebus 在 rabbitmq 中订阅和处理消息。第三方程序集中定义了多种消息类型,新的消息类型将定期添加到该程序集中。
我需要以某种方式让 Rebus 订阅和处理所有这些消息类型并将它们转发(发布)到另一个 rabbitmq 实例。我的服务本质上是转发消息,并在这样做时添加一个自定义的 rebus header。
问题是我不想为每种消息类型生成处理程序 类(因为无论消息类型如何,功能都是相同的)。我也不想每次在第三方程序集中添加新的消息类型时更新我的代码(编写新的处理程序 类)。
我尝试使用 TypeBuilder 为通过反射找到的每个类型动态创建消息处理程序 类,但感觉有点乱所以我希望有另一种方法吗?
下面的代码概述了我希望实现的目标,即使代码无法编译。
public void SubscribeAndHandleMessages()
{
// These types will be determined runtime by using reflection but thats omitted for clarity
var messageTypes = new List<Type>(){typeof(MessageA), typeof(MessageB)};
var activator = new BuiltinHandlerActivator();
Configure.With(activator)
.Transport(t => t.UseRabbitMq(_rabbitConnectionString, "MyQueue"))
.Start();
//Subscribe and register handlers
foreach (var type in messageTypes)
{
activator.Bus.Subscribe(type); //This works, I can see the queue subscribing to the correct topics
activator.Handle<type>(async (bus, context, message) => //This doesnt work since type is not known at compile time
{
//Forwarding to another rabbit instance, same handling for all types of messages
});
}
}
建立必要的订阅后,您只需要能够处理收到的各种消息。
用 Rebus 做到这一点的最佳方法是避免使用普通的消息处理管道(反序列化 => 查找处理程序 => 分派),而是以原始形式(即在其“传输消息”中)处理消息形式)。
您可以使用 Rebus 的传输消息转发功能来做到这一点。有了它,100% 的通用消息处理程序可能如下所示:
Configure.With(activator)
.Transport(t => t.UseInMemoryTransport(new InMemNetwork(), "router-tjek"))
.Routing(r => r.AddTransportMessageForwarder(async transportMessage =>
{
var headers = transportMessage.Headers; //< Dictionary<string, string>
var body = transportMessage.Body; //< byte[]
// handle the message here, e.g.
// by deserializing the body into a JObject,
// storing the bytes in a database, or by
// forwarding the message to another queue
return // appropriate forward action here
}))
.Start();
您可以在此处阅读更多相关信息:Transport message forwarding
我遇到了一个问题,我想使用 Rebus 在 rabbitmq 中订阅和处理消息。第三方程序集中定义了多种消息类型,新的消息类型将定期添加到该程序集中。
我需要以某种方式让 Rebus 订阅和处理所有这些消息类型并将它们转发(发布)到另一个 rabbitmq 实例。我的服务本质上是转发消息,并在这样做时添加一个自定义的 rebus header。
问题是我不想为每种消息类型生成处理程序 类(因为无论消息类型如何,功能都是相同的)。我也不想每次在第三方程序集中添加新的消息类型时更新我的代码(编写新的处理程序 类)。
我尝试使用 TypeBuilder 为通过反射找到的每个类型动态创建消息处理程序 类,但感觉有点乱所以我希望有另一种方法吗?
下面的代码概述了我希望实现的目标,即使代码无法编译。
public void SubscribeAndHandleMessages()
{
// These types will be determined runtime by using reflection but thats omitted for clarity
var messageTypes = new List<Type>(){typeof(MessageA), typeof(MessageB)};
var activator = new BuiltinHandlerActivator();
Configure.With(activator)
.Transport(t => t.UseRabbitMq(_rabbitConnectionString, "MyQueue"))
.Start();
//Subscribe and register handlers
foreach (var type in messageTypes)
{
activator.Bus.Subscribe(type); //This works, I can see the queue subscribing to the correct topics
activator.Handle<type>(async (bus, context, message) => //This doesnt work since type is not known at compile time
{
//Forwarding to another rabbit instance, same handling for all types of messages
});
}
}
建立必要的订阅后,您只需要能够处理收到的各种消息。
用 Rebus 做到这一点的最佳方法是避免使用普通的消息处理管道(反序列化 => 查找处理程序 => 分派),而是以原始形式(即在其“传输消息”中)处理消息形式)。
您可以使用 Rebus 的传输消息转发功能来做到这一点。有了它,100% 的通用消息处理程序可能如下所示:
Configure.With(activator)
.Transport(t => t.UseInMemoryTransport(new InMemNetwork(), "router-tjek"))
.Routing(r => r.AddTransportMessageForwarder(async transportMessage =>
{
var headers = transportMessage.Headers; //< Dictionary<string, string>
var body = transportMessage.Body; //< byte[]
// handle the message here, e.g.
// by deserializing the body into a JObject,
// storing the bytes in a database, or by
// forwarding the message to another queue
return // appropriate forward action here
}))
.Start();
您可以在此处阅读更多相关信息:Transport message forwarding