在 MassTransit 中发布通用(基于接口)消息和使用具体(具体 class)消息

Publish generic (interface based) message and Consume concrete (concrete class) message in MassTransit

我有这个设计选择问题,不知何故苦苦挣扎但徒劳无功。它仅在特定情况下有效。

我正在尝试在 MassTransit 中发布和使用消息。 例如:(发布者 - 一个简单的控制台应用程序)

IShape message = GetShape(/**Business Logic will return some concrete object (Circle or square) based on some business inputs**/);  
bus.Publish(message);

(消费者 - CircleConsumer 和 SquareConsumer)

 class CircleConsumer : IConsumer<IShape>
    {
        public Task Consume(ConsumeContext<IShape> context)
        {
            var circle = context.Message as Circle;
            return Task.CompletedTask;
        }
    }

    class SquareConsumer : IConsumer<IShape>
    {
        public Task Consume(ConsumeContext<IShape> context)
        {
            var square = context.Message as Square;
            return Task.CompletedTask;
        }
    }

(.Net Core 托管服务项目中的消费者配置)

 public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureServices((hostContext, services) =>
                {
                    services.AddHostedService<Worker>()
                   .AddScoped<SquareConsumer>()
                    .AddScoped<CircleConsumer>()
                    .AddMassTransit(cfg =>
                     {
                         cfg.AddBus(ConfigureBus);
                         cfg.AddConsumer<SquareConsumer>();
                         cfg.AddConsumer<CircleConsumer>();
                     })
                    .AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>())
                    .AddSingleton<IHostedService, TestMTConsumerHostedService>();

                    IBusControl ConfigureBus(IServiceProvider provider)
                    {
                        return Bus.Factory.CreateUsingRabbitMq(cfg =>
                        {
                            var host = cfg.Host(hostContext.Configuration["RabbmitMQ:Server:Host"], hostContext.Configuration["RabbmitMQ:Server:VirtualHost"], h =>
                            {
                                h.Username(hostContext.Configuration["RabbmitMQ:Auth:Username"]);
                                h.Password(hostContext.Configuration["RabbmitMQ:Auth:Password"]);
                            });

                            cfg.ReceiveEndpoint("CircleQueue", ep =>
                            {
                                ep.PrefetchCount = 16;
                                ep.UseMessageRetry(r => r.Interval(2, 100));
                                ep.Consumer<CircleConsumer>(provider);
                            });

                            cfg.ReceiveEndpoint("SquareQueue", ep =>
                            {
                                ep.PrefetchCount = 16;
                                ep.UseMessageRetry(r => r.Interval(2, 100));
                                ep.Consumer<SquareConsumer>(provider);
                            });
                        });
                    }

                });

我的要求是让 Publisher 在不知道具体内容的情况下发布消息 类。并且只有一个消费者根据消息类型接收消息。

但看起来消费者都在接收消息,而且转换也不起作用。 Desired:假设,当发布者发送 Square 对象时,只有 Square 消费者应该接收调用。但是,就我而言,SquareConsumer 和 CircleConsumer 都收到了消息。

作为一种变通方法,这可行:

  1. 始终发布具体对象。

            bus.Publish(new Square());
    
  2. 使用具体类型声明消费者。

           class CircleConsumer : IConsumer<Circle>
            {
                public Task Consume(ConsumeContext<Circle> context)
                {
                    var circle = context.Message;
                    return Task.CompletedTask;
                }
            }
    
            class SquareConsumer : IConsumer<Square>
            {
                public Task Consume(ConsumeContext<Square> context)
                {
                    var square = context.Message;
                    return Task.CompletedTask;
                }
            }
    

但是,如果我能做到这一点就好了。

有什么建议吗?

如果您这样更改代码:

object message = GetShape(/**Business Logic will return some concrete object (Circle or square) based on some business inputs**/);  
bus.Publish(message);

和消费者

class CircleConsumer : IConsumer<Circle>
{
    public Task Consume(ConsumeContext<Circle> context)
    {
        // do circle stuff
    }
}

class SquareConsumer : IConsumer<Square>
{
    public Task Consume(ConsumeContext<Square> context)
    {
        // do square stuff
    }
}

它将按预期工作。

这里我详细说一下变化:

  1. 对特定类型的实例使用 Publish 意味着使用 Publish<T>(T message) 重载,它使用 T 作为消息类型。当显式地将消息类型设置为 object 时,我们调用 Publish(object message) 重载。在这种情况下,MassTransit 将查找消息实现的所有类型。
  2. 如果您的目标是使用具体类型的消息,则不需要使用共享接口类型的消息。您只需要为这些特定类型创建消费者。只要你像我在上一点中描述的那样使用发布,消息就会同时发送到 IShapeCircle 交换(例如)。

更新: 我最后采用了下面的方法。但是,我希望 MassTransit 能够路由本质上纯粹多态的消息。这只是一种解决方法,而不是真正的解决方案。仍然欢迎使用新方法。

在具体 classes 中使用反射和友元方法的帮助,我得到了这个。

出版商:

            IShape message = GetShape(text);
            var castedMessage = ReflectionHelper.CastToConcreteType(message);
            bus.Publish(castedMessage);

        public static class ReflectionHelper
        {
            public static object CastToConcreteType(object obj)
            {
                MethodInfo castMethod = obj.GetType().GetMethod("Cast").MakeGenericMethod(obj.GetType());
                return castMethod.Invoke(null, new object[] { obj });
            }
        }

消息类型的接口和具体 类:

    public interface IShape
        {
            public string Color { get;  }
            public string Name { get; }
        }

        public class Circle : IShape, ITypeCastable
    {
            public string Color => "Red";
            public string Name => $"{Color}Circle";
            T ITypeCastable.Cast<T>(object obj) => Cast<T>(obj);
            public static T Cast<T>(object o) => (T)o;
        }
        public class Square : IShape, ITypeCastable
        {
            public string Color => "Green";
            public string Name => $"{Color}Square";
            T ITypeCastable.Cast<T>(object obj) => Cast<T>(obj);
            public static T Cast<T>(object o) => (T)o;
        }

        public interface ITypeCastable
        {
            T Cast<T>(object obj);
        }

消费者:一个非常小的变化,在通用类型供应中用具体的 class 名称替换接口。

    class CircleConsumer : IConsumer<Circle>
    {
        public Task Consume(ConsumeContext<Circle> context)
        {
            var circle = context.Message;
            return Task.CompletedTask;
        }
    }

    class SquareConsumer : IConsumer<Square>
    {
        public Task Consume(ConsumeContext<Square> context)
        {
            var square = context.Message;
            return Task.CompletedTask;
        }
    }