在运行时为 Rebus 生成消息处理程序

Generating message handlers for Rebus in runtime

我遇到了一个问题,我想使用 Rebus 在 rabbitmq 中订阅和处理消息。第三方程序集中定义了多种消息类型,新的消息类型将定期添加到该程序集中。

我需要以某种方式让 Rebus 订阅和处理所有这些消息类型并将它们转发(发布)到另一个 rabbitmq 实例。我的服务本质上是转发消息,并在这样做时添加一个自定义的 rebus header。

问题是我不想为每种消息类型生成处理程序 类(因为无论消息类型如何,功能都是相同的)。我也不想每次在第三方程序集中添加新的消息类型时更新我的​​代码(编写新的处理程序 类)。

我尝试使用 TypeBuilder 为通过反射找到的每个类型动态创建消息处理程序 类,但感觉有点乱所以我希望有另一种方法吗?

下面的代码概述了我希望实现的目标,即使代码无法编译。

public void SubscribeAndHandleMessages()
        {
            // These types will be determined runtime by using reflection but thats omitted for clarity
            var messageTypes = new List<Type>(){typeof(MessageA), typeof(MessageB)}; 

            var activator = new BuiltinHandlerActivator();

            Configure.With(activator)
                .Transport(t => t.UseRabbitMq(_rabbitConnectionString, "MyQueue"))
                .Start();

            //Subscribe and register handlers
            foreach (var type in messageTypes)
            {
                activator.Bus.Subscribe(type); //This works, I can see the queue subscribing to the correct topics
                activator.Handle<type>(async (bus, context, message) => //This doesnt work since type is not known at compile time
                {
                    //Forwarding to another rabbit instance, same handling for all types of messages
                });
            }
        }

建立必要的订阅后,您只需要能够处理收到的各种消息。

用 Rebus 做到这一点的最佳方法是避免使用普通的消息处理管道(反序列化 => 查找处理程序 => 分派),而是以原始形式(即在其“传输消息”中)处理消息形式)。

您可以使用 Rebus 的传输消息转发功能来做到这一点。有了它,100% 的通用消息处理程序可能如下所示:

Configure.With(activator)
    .Transport(t => t.UseInMemoryTransport(new InMemNetwork(), "router-tjek"))
    .Routing(r => r.AddTransportMessageForwarder(async transportMessage =>
    {
        var headers = transportMessage.Headers; //< Dictionary<string, string>
        var body = transportMessage.Body;       //< byte[]

        // handle the message here, e.g.
        // by deserializing the body into a JObject,
        // storing the bytes in a database, or by
        // forwarding the message to another queue
        return // appropriate forward action here
    }))
    .Start();

您可以在此处阅读更多相关信息:Transport message forwarding