如何使用位于不同程序集中的 DTO 副本在两个微服务之间实现 Rebus 订阅/发布?
How implement Rebus subscription / publication between two microservices with copies of DTO located in different assemblies?
我在以下一般情况下使用 Rebus,对于 WCF 或 WebAPI 来说很自然。有两个微服务,在它们的程序集中具有相同的 DTO 集,用于交换集成事件。 DTO 类 类似,但它们驻留在具有不同 .Net Core 和 .Net Framework 框架的不同程序集中。我不喜欢在单独的共享程序集中创建 DTO。您如何基于位于不同程序集中的两个 DTO 副本实现订阅/发布以在这两个微服务之间交换集成事件?
第一个微服务示例:
public class Message
{
public int MessageNumber { get; set; }
}
class Program
{
static void Main()
{
using (var adapter = new BuiltinHandlerActivator())
{
Configure.With(adapter)
.Logging(l => l.ColoredConsole(LogLevel.Warn))
.Transport(t => t.UseRabbitMqAsOneWayClient("amqp://myRebbitMQ"))
.Start();
var keepRunning = true;
while (keepRunning)
{
var key = char.ToLower(Console.ReadKey(true).KeyChar);
switch (key)
{
case 'f':
Console.WriteLine("Publishing {0} messages", 1000);
var jobs = Enumerable.Range(0, 1000)
.Select(i => new Message { MessageNumber = i })
.Select(m => adapter.Bus.Publish(m))
.ToArray();
Task.WaitAll(jobs);
break;
case 'q':
Console.WriteLine("Quitting");
keepRunning = false;
break;
}
}
}
}
}
第二个微服务示例:
public class Message
{
public int MessageNumber { get; set; }
}
public class MessageHandler : IHandleMessages<Message>
{
public async Task Handle(Message message)
{
Console.WriteLine("Processing message {0}", message.MessageNumber);
await Task.Delay(200);
}
}
class Program
{
private static IContainer _container;
static void Main()
{
var builder = new ContainerBuilder();
builder.RegisterType<MessageHandler>().As(typeof(IHandleMessages<>).MakeGenericType(typeof(Message)));
builder.RegisterRebus((configurer, context) => configurer
.Logging(l => l.ColoredConsole(LogLevel.Warn))
.Transport(t => t.UseRabbitMq("amqp://myRebbitMQ", "consumer2"))
//.Serialization(s => s.UseNewtonsoftJson(new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.Objects })) ???
.Options(o => o.SetMaxParallelism(5))
);
using (_container = builder.Build())
{
var eventBus = _container.Resolve<IBus>();
eventBus.Subscribe<Message>().Wait();
Console.WriteLine("Consumer listening - press ENTER to quit");
Console.ReadLine();
}
}
}
虽然 Rebus 对 type-based pub/sub 有突出的 APIs,但底层实现适用于普通 string-based 主题。
这意味着例如
await bus.Subscribe<YourMessage>();
和
await bus.Publish(new YourMessage());
简单地转换为主题 "YourMessages.YourMessage, YourMessages"
的订阅和发布操作(这里假装 YourMessage
位于 YourMessages
程序集中。
您可以使用主题 API 访问原始 topic-based API,您可以在此处获取:
var topics = bus.Advanced.Topics;
然后你就可以
await topics.Subscribe("to anything");
和
await topics.Publish("to anything", new YourMessage());
因此,为了解决您的问题,我建议您利用这一点并为每种事件类型确定一个共同主题。
我在以下一般情况下使用 Rebus,对于 WCF 或 WebAPI 来说很自然。有两个微服务,在它们的程序集中具有相同的 DTO 集,用于交换集成事件。 DTO 类 类似,但它们驻留在具有不同 .Net Core 和 .Net Framework 框架的不同程序集中。我不喜欢在单独的共享程序集中创建 DTO。您如何基于位于不同程序集中的两个 DTO 副本实现订阅/发布以在这两个微服务之间交换集成事件?
第一个微服务示例:
public class Message
{
public int MessageNumber { get; set; }
}
class Program
{
static void Main()
{
using (var adapter = new BuiltinHandlerActivator())
{
Configure.With(adapter)
.Logging(l => l.ColoredConsole(LogLevel.Warn))
.Transport(t => t.UseRabbitMqAsOneWayClient("amqp://myRebbitMQ"))
.Start();
var keepRunning = true;
while (keepRunning)
{
var key = char.ToLower(Console.ReadKey(true).KeyChar);
switch (key)
{
case 'f':
Console.WriteLine("Publishing {0} messages", 1000);
var jobs = Enumerable.Range(0, 1000)
.Select(i => new Message { MessageNumber = i })
.Select(m => adapter.Bus.Publish(m))
.ToArray();
Task.WaitAll(jobs);
break;
case 'q':
Console.WriteLine("Quitting");
keepRunning = false;
break;
}
}
}
}
}
第二个微服务示例:
public class Message
{
public int MessageNumber { get; set; }
}
public class MessageHandler : IHandleMessages<Message>
{
public async Task Handle(Message message)
{
Console.WriteLine("Processing message {0}", message.MessageNumber);
await Task.Delay(200);
}
}
class Program
{
private static IContainer _container;
static void Main()
{
var builder = new ContainerBuilder();
builder.RegisterType<MessageHandler>().As(typeof(IHandleMessages<>).MakeGenericType(typeof(Message)));
builder.RegisterRebus((configurer, context) => configurer
.Logging(l => l.ColoredConsole(LogLevel.Warn))
.Transport(t => t.UseRabbitMq("amqp://myRebbitMQ", "consumer2"))
//.Serialization(s => s.UseNewtonsoftJson(new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.Objects })) ???
.Options(o => o.SetMaxParallelism(5))
);
using (_container = builder.Build())
{
var eventBus = _container.Resolve<IBus>();
eventBus.Subscribe<Message>().Wait();
Console.WriteLine("Consumer listening - press ENTER to quit");
Console.ReadLine();
}
}
}
虽然 Rebus 对 type-based pub/sub 有突出的 APIs,但底层实现适用于普通 string-based 主题。
这意味着例如
await bus.Subscribe<YourMessage>();
和
await bus.Publish(new YourMessage());
简单地转换为主题 "YourMessages.YourMessage, YourMessages"
的订阅和发布操作(这里假装 YourMessage
位于 YourMessages
程序集中。
您可以使用主题 API 访问原始 topic-based API,您可以在此处获取:
var topics = bus.Advanced.Topics;
然后你就可以
await topics.Subscribe("to anything");
和
await topics.Publish("to anything", new YourMessage());
因此,为了解决您的问题,我建议您利用这一点并为每种事件类型确定一个共同主题。