如何使用位于不同程序集中的 DTO 副本在两个微服务之间实现 Rebus 订阅/发布?

How implement Rebus subscription / publication between two microservices with copies of DTO located in different assemblies?

我在以下一般情况下使用 Rebus,对于 WCF 或 WebAPI 来说很自然。有两个微服务,在它们的程序集中具有相同的 DTO 集,用于交换集成事件。 DTO 类 类似,但它们驻留在具有不同 .Net Core 和 .Net Framework 框架的不同程序集中。我不喜欢在单独的共享程序集中创建 DTO。您如何基于位于不同程序集中的两个 DTO 副本实现订阅/发布以在这两个微服务之间交换集成事件?

第一个微服务示例:

public class Message
{
    public int MessageNumber { get; set; }
}

class Program
{
    static void Main()
    {
        using (var adapter = new BuiltinHandlerActivator())
        {
            Configure.With(adapter)
                .Logging(l => l.ColoredConsole(LogLevel.Warn))
                .Transport(t => t.UseRabbitMqAsOneWayClient("amqp://myRebbitMQ"))
                .Start();

            var keepRunning = true;
            while (keepRunning)
            {
                var key = char.ToLower(Console.ReadKey(true).KeyChar);
                switch (key)
                {
                    case 'f':
                        Console.WriteLine("Publishing {0} messages", 1000);
                        var jobs = Enumerable.Range(0, 1000)
                            .Select(i => new Message { MessageNumber = i })
                            .Select(m => adapter.Bus.Publish(m))
                            .ToArray();
                        Task.WaitAll(jobs);
                        break;
                    case 'q':
                        Console.WriteLine("Quitting");
                        keepRunning = false;
                        break;
                }
            }
        }
    }
}

第二个微服务示例:

public class Message
{
    public int MessageNumber { get; set; }
}

public class MessageHandler : IHandleMessages<Message>
{
    public async Task Handle(Message message)
    {
        Console.WriteLine("Processing message {0}", message.MessageNumber);

        await Task.Delay(200);
    }
}

class Program
{
    private static IContainer _container;
    static void Main()
    {
        var builder = new ContainerBuilder();
        builder.RegisterType<MessageHandler>().As(typeof(IHandleMessages<>).MakeGenericType(typeof(Message)));
        builder.RegisterRebus((configurer, context) => configurer
            .Logging(l => l.ColoredConsole(LogLevel.Warn))
            .Transport(t => t.UseRabbitMq("amqp://myRebbitMQ", "consumer2"))
            //.Serialization(s => s.UseNewtonsoftJson(new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.Objects })) ???
            .Options(o => o.SetMaxParallelism(5))
        );
        using (_container = builder.Build())
        {
            var eventBus = _container.Resolve<IBus>();
            eventBus.Subscribe<Message>().Wait();
            Console.WriteLine("Consumer listening - press ENTER to quit");
            Console.ReadLine();
        }
    }
}

虽然 Rebus 对 type-based pub/sub 有突出的 APIs,但底层实现适用于普通 string-based 主题。

这意味着例如

await bus.Subscribe<YourMessage>();

await bus.Publish(new YourMessage());

简单地转换为主题 "YourMessages.YourMessage, YourMessages" 的订阅和发布操作(这里假装 YourMessage 位于 YourMessages 程序集中。

您可以使用主题 API 访问原始 topic-based API,您可以在此处获取:

var topics = bus.Advanced.Topics;

然后你就可以

await topics.Subscribe("to anything");

await topics.Publish("to anything", new YourMessage());

因此,为了解决您的问题,我建议您利用这一点并为每种事件类型确定一个共同主题。