在 MassTransit 中发布通用(基于接口)消息和使用具体(具体 class)消息
Publish generic (interface based) message and Consume concrete (concrete class) message in MassTransit
我有这个设计选择问题,不知何故苦苦挣扎但徒劳无功。它仅在特定情况下有效。
我正在尝试在 MassTransit 中发布和使用消息。
例如:(发布者 - 一个简单的控制台应用程序)
IShape message = GetShape(/**Business Logic will return some concrete object (Circle or square) based on some business inputs**/);
bus.Publish(message);
(消费者 - CircleConsumer 和 SquareConsumer)
class CircleConsumer : IConsumer<IShape>
{
public Task Consume(ConsumeContext<IShape> context)
{
var circle = context.Message as Circle;
return Task.CompletedTask;
}
}
class SquareConsumer : IConsumer<IShape>
{
public Task Consume(ConsumeContext<IShape> context)
{
var square = context.Message as Square;
return Task.CompletedTask;
}
}
(.Net Core 托管服务项目中的消费者配置)
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddHostedService<Worker>()
.AddScoped<SquareConsumer>()
.AddScoped<CircleConsumer>()
.AddMassTransit(cfg =>
{
cfg.AddBus(ConfigureBus);
cfg.AddConsumer<SquareConsumer>();
cfg.AddConsumer<CircleConsumer>();
})
.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>())
.AddSingleton<IHostedService, TestMTConsumerHostedService>();
IBusControl ConfigureBus(IServiceProvider provider)
{
return Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(hostContext.Configuration["RabbmitMQ:Server:Host"], hostContext.Configuration["RabbmitMQ:Server:VirtualHost"], h =>
{
h.Username(hostContext.Configuration["RabbmitMQ:Auth:Username"]);
h.Password(hostContext.Configuration["RabbmitMQ:Auth:Password"]);
});
cfg.ReceiveEndpoint("CircleQueue", ep =>
{
ep.PrefetchCount = 16;
ep.UseMessageRetry(r => r.Interval(2, 100));
ep.Consumer<CircleConsumer>(provider);
});
cfg.ReceiveEndpoint("SquareQueue", ep =>
{
ep.PrefetchCount = 16;
ep.UseMessageRetry(r => r.Interval(2, 100));
ep.Consumer<SquareConsumer>(provider);
});
});
}
});
我的要求是让 Publisher 在不知道具体内容的情况下发布消息 类。并且只有一个消费者根据消息类型接收消息。
但看起来消费者都在接收消息,而且转换也不起作用。 Desired:假设,当发布者发送 Square 对象时,只有 Square 消费者应该接收调用。但是,就我而言,SquareConsumer 和 CircleConsumer 都收到了消息。
作为一种变通方法,这可行:
始终发布具体对象。
bus.Publish(new Square());
使用具体类型声明消费者。
class CircleConsumer : IConsumer<Circle>
{
public Task Consume(ConsumeContext<Circle> context)
{
var circle = context.Message;
return Task.CompletedTask;
}
}
class SquareConsumer : IConsumer<Square>
{
public Task Consume(ConsumeContext<Square> context)
{
var square = context.Message;
return Task.CompletedTask;
}
}
但是,如果我能做到这一点就好了。
有什么建议吗?
如果您这样更改代码:
object message = GetShape(/**Business Logic will return some concrete object (Circle or square) based on some business inputs**/);
bus.Publish(message);
和消费者
class CircleConsumer : IConsumer<Circle>
{
public Task Consume(ConsumeContext<Circle> context)
{
// do circle stuff
}
}
class SquareConsumer : IConsumer<Square>
{
public Task Consume(ConsumeContext<Square> context)
{
// do square stuff
}
}
它将按预期工作。
这里我详细说一下变化:
- 对特定类型的实例使用
Publish
意味着使用 Publish<T>(T message)
重载,它使用 T
作为消息类型。当显式地将消息类型设置为 object
时,我们调用 Publish(object message)
重载。在这种情况下,MassTransit 将查找消息实现的所有类型。
- 如果您的目标是使用具体类型的消息,则不需要使用共享接口类型的消息。您只需要为这些特定类型创建消费者。只要你像我在上一点中描述的那样使用发布,消息就会同时发送到
IShape
和 Circle
交换(例如)。
更新: 我最后采用了下面的方法。但是,我希望 MassTransit 能够路由本质上纯粹多态的消息。这只是一种解决方法,而不是真正的解决方案。仍然欢迎使用新方法。
在具体 classes 中使用反射和友元方法的帮助,我得到了这个。
出版商:
IShape message = GetShape(text);
var castedMessage = ReflectionHelper.CastToConcreteType(message);
bus.Publish(castedMessage);
public static class ReflectionHelper
{
public static object CastToConcreteType(object obj)
{
MethodInfo castMethod = obj.GetType().GetMethod("Cast").MakeGenericMethod(obj.GetType());
return castMethod.Invoke(null, new object[] { obj });
}
}
消息类型的接口和具体 类:
public interface IShape
{
public string Color { get; }
public string Name { get; }
}
public class Circle : IShape, ITypeCastable
{
public string Color => "Red";
public string Name => $"{Color}Circle";
T ITypeCastable.Cast<T>(object obj) => Cast<T>(obj);
public static T Cast<T>(object o) => (T)o;
}
public class Square : IShape, ITypeCastable
{
public string Color => "Green";
public string Name => $"{Color}Square";
T ITypeCastable.Cast<T>(object obj) => Cast<T>(obj);
public static T Cast<T>(object o) => (T)o;
}
public interface ITypeCastable
{
T Cast<T>(object obj);
}
消费者:一个非常小的变化,在通用类型供应中用具体的 class 名称替换接口。
class CircleConsumer : IConsumer<Circle>
{
public Task Consume(ConsumeContext<Circle> context)
{
var circle = context.Message;
return Task.CompletedTask;
}
}
class SquareConsumer : IConsumer<Square>
{
public Task Consume(ConsumeContext<Square> context)
{
var square = context.Message;
return Task.CompletedTask;
}
}
我有这个设计选择问题,不知何故苦苦挣扎但徒劳无功。它仅在特定情况下有效。
我正在尝试在 MassTransit 中发布和使用消息。 例如:(发布者 - 一个简单的控制台应用程序)
IShape message = GetShape(/**Business Logic will return some concrete object (Circle or square) based on some business inputs**/);
bus.Publish(message);
(消费者 - CircleConsumer 和 SquareConsumer)
class CircleConsumer : IConsumer<IShape>
{
public Task Consume(ConsumeContext<IShape> context)
{
var circle = context.Message as Circle;
return Task.CompletedTask;
}
}
class SquareConsumer : IConsumer<IShape>
{
public Task Consume(ConsumeContext<IShape> context)
{
var square = context.Message as Square;
return Task.CompletedTask;
}
}
(.Net Core 托管服务项目中的消费者配置)
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddHostedService<Worker>()
.AddScoped<SquareConsumer>()
.AddScoped<CircleConsumer>()
.AddMassTransit(cfg =>
{
cfg.AddBus(ConfigureBus);
cfg.AddConsumer<SquareConsumer>();
cfg.AddConsumer<CircleConsumer>();
})
.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>())
.AddSingleton<IHostedService, TestMTConsumerHostedService>();
IBusControl ConfigureBus(IServiceProvider provider)
{
return Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(hostContext.Configuration["RabbmitMQ:Server:Host"], hostContext.Configuration["RabbmitMQ:Server:VirtualHost"], h =>
{
h.Username(hostContext.Configuration["RabbmitMQ:Auth:Username"]);
h.Password(hostContext.Configuration["RabbmitMQ:Auth:Password"]);
});
cfg.ReceiveEndpoint("CircleQueue", ep =>
{
ep.PrefetchCount = 16;
ep.UseMessageRetry(r => r.Interval(2, 100));
ep.Consumer<CircleConsumer>(provider);
});
cfg.ReceiveEndpoint("SquareQueue", ep =>
{
ep.PrefetchCount = 16;
ep.UseMessageRetry(r => r.Interval(2, 100));
ep.Consumer<SquareConsumer>(provider);
});
});
}
});
我的要求是让 Publisher 在不知道具体内容的情况下发布消息 类。并且只有一个消费者根据消息类型接收消息。
但看起来消费者都在接收消息,而且转换也不起作用。 Desired:假设,当发布者发送 Square 对象时,只有 Square 消费者应该接收调用。但是,就我而言,SquareConsumer 和 CircleConsumer 都收到了消息。
作为一种变通方法,这可行:
始终发布具体对象。
bus.Publish(new Square());
使用具体类型声明消费者。
class CircleConsumer : IConsumer<Circle> { public Task Consume(ConsumeContext<Circle> context) { var circle = context.Message; return Task.CompletedTask; } } class SquareConsumer : IConsumer<Square> { public Task Consume(ConsumeContext<Square> context) { var square = context.Message; return Task.CompletedTask; } }
但是,如果我能做到这一点就好了。
有什么建议吗?
如果您这样更改代码:
object message = GetShape(/**Business Logic will return some concrete object (Circle or square) based on some business inputs**/);
bus.Publish(message);
和消费者
class CircleConsumer : IConsumer<Circle>
{
public Task Consume(ConsumeContext<Circle> context)
{
// do circle stuff
}
}
class SquareConsumer : IConsumer<Square>
{
public Task Consume(ConsumeContext<Square> context)
{
// do square stuff
}
}
它将按预期工作。
这里我详细说一下变化:
- 对特定类型的实例使用
Publish
意味着使用Publish<T>(T message)
重载,它使用T
作为消息类型。当显式地将消息类型设置为object
时,我们调用Publish(object message)
重载。在这种情况下,MassTransit 将查找消息实现的所有类型。 - 如果您的目标是使用具体类型的消息,则不需要使用共享接口类型的消息。您只需要为这些特定类型创建消费者。只要你像我在上一点中描述的那样使用发布,消息就会同时发送到
IShape
和Circle
交换(例如)。
更新: 我最后采用了下面的方法。但是,我希望 MassTransit 能够路由本质上纯粹多态的消息。这只是一种解决方法,而不是真正的解决方案。仍然欢迎使用新方法。
在具体 classes 中使用反射和友元方法的帮助,我得到了这个。
出版商:
IShape message = GetShape(text);
var castedMessage = ReflectionHelper.CastToConcreteType(message);
bus.Publish(castedMessage);
public static class ReflectionHelper
{
public static object CastToConcreteType(object obj)
{
MethodInfo castMethod = obj.GetType().GetMethod("Cast").MakeGenericMethod(obj.GetType());
return castMethod.Invoke(null, new object[] { obj });
}
}
消息类型的接口和具体 类:
public interface IShape
{
public string Color { get; }
public string Name { get; }
}
public class Circle : IShape, ITypeCastable
{
public string Color => "Red";
public string Name => $"{Color}Circle";
T ITypeCastable.Cast<T>(object obj) => Cast<T>(obj);
public static T Cast<T>(object o) => (T)o;
}
public class Square : IShape, ITypeCastable
{
public string Color => "Green";
public string Name => $"{Color}Square";
T ITypeCastable.Cast<T>(object obj) => Cast<T>(obj);
public static T Cast<T>(object o) => (T)o;
}
public interface ITypeCastable
{
T Cast<T>(object obj);
}
消费者:一个非常小的变化,在通用类型供应中用具体的 class 名称替换接口。
class CircleConsumer : IConsumer<Circle>
{
public Task Consume(ConsumeContext<Circle> context)
{
var circle = context.Message;
return Task.CompletedTask;
}
}
class SquareConsumer : IConsumer<Square>
{
public Task Consume(ConsumeContext<Square> context)
{
var square = context.Message;
return Task.CompletedTask;
}
}