如何使用 MassTransit 和 RabbitMQ 跨微服务共享消息定义?
How to share message definition across microservices using MassTransit with RabbitMQ?
我是微服务的新手,在使用 MassTransit 框架连接发布者和订阅者时遇到了一点问题。创建用户时,我在其中一项服务中生成了一个示例集成事件。定义如下:
public sealed record UserCreatedIntegrationEvent : IntegrationEvent
{
public UserCreatedIntegrationEvent(Guid id,
string login,
string firstName,
string lastName,
string mailAddress)
: base(id,
nameof(UserCreatedIntegrationEvent))
{
Login = login;
FirstName = firstName;
LastName = lastName;
MailAddress = mailAddress;
}
public string Login { get; }
public string FirstName { get; }
public string LastName { get; }
public string MailAddress { get; }
}
消息包含在 User.Application 项目中。消息正在消息代理总线上正确发布。现在我需要在其他服务中接收一个事件,所以我需要知道那里的 UserCreatedIntegrationEvent 的定义,反序列化它等。我可以参考 User.Application 项目,但在我看来它可能会导致一些问题,一般来说我认为这种方法违反了微服务自治规则。另一种解决方案是在接收方服务中复制消息定义。然后我复制 UserCreatedIntegrationEvent 并将新粘贴的事件连接到特定的处理程序:
public abstract class IntegrationEventHandler<TIntegrationEvent> : IConsumer<TIntegrationEvent>
where TIntegrationEvent : IntegrationEvent
{
protected ConsumeContext<TIntegrationEvent> ConsumeContext { get; private set; }
public async Task Consume(ConsumeContext<TIntegrationEvent> context)
{
ConsumeContext = context;
await HandleAsync(context.Message);
}
public abstract Task HandleAsync(TIntegrationEvent @event);
}
...
public sealed class UserCreatedIntegrationEventHandler : IntegrationEventHandler<UserCreatedIntegrationEvent>
{
public override async Task HandleAsync(UserCreatedIntegrationEvent @event)
{
throw new System.NotImplementedException();
}
}
问题是,即使我复制了事件定义,所以它完全一样,但不会调用相应处理程序中的 HandleAsync 方法。但是当我尝试直接从其他服务引用 User.Application.UserCreatedIntegrationEvent 时,该方法被正确调用,但我不太喜欢这个解决方案。如何妥善解决问题?我是否应该在其他服务中复制定义(如何使用 MassTransit 连接它们),也许我应该将合同移动到其他一些包并从这两个服务中引用包?负责broker依赖注册的代码:
internal static IServiceCollection AddRabbitMQ(this IServiceCollection services,
IConfiguration configuration,
bool useHealthCheck,
Assembly consumersAssembly)
{
var settingsSection = configuration.GetSection(RabbitMQSettingsSectionKey);
var rabbitMQSettings = settingsSection.Get<RabbitMQSettings>();
services
.AddMassTransit(configurator =>
{
configurator.AddConsumers(consumersAssembly);
configurator.SetKebabCaseEndpointNameFormatter();
configurator.UsingRabbitMq((context, busFactoryConfigurator) =>
{
busFactoryConfigurator
.Host(rabbitMQSettings.HostName,
rabbitMQSettings.VirtualHostName,
hostConfigurator =>
{
hostConfigurator.Username(rabbitMQSettings.UserName);
hostConfigurator.Password(rabbitMQSettings.Password);
});
busFactoryConfigurator.ConfigureEndpoints(context);
});
})
.AddMassTransitHostedService()
.Configure<RabbitMQSettings>(settingsSection)
.AddScoped<IIntegrationEventPublisher, EventBus>();
if (useHealthCheck)
{
services
.AddHealthChecks()
.AddRabbitMQ(string.Format(RabbitMQConnectionStringPattern,
rabbitMQSettings.HostName),
name: RabbitConnectionCheckName,
tags: new[] { DefaultRabbitMQTag });
}
return services;
}
感谢您的帮助!
如果您将消息 class 从发布者复制到消费者,您需要确保消息类型相同,包括命名空间。这是文档中的 clearly highlighted。如果类型不匹配,那就可以解释为什么它没有被服务使用。
复制文件或与合同共享 NuGet 包这两种方法都可以。两者都被广泛使用。 MassTransit guidelines 不断发展消息合同以确保向后兼容性。
来自docs:
Important
MassTransit uses the full type name, including the namespace, for
message contracts. When creating the same message type in two separate
projects, the namespaces must match or the message will not be
consumed.
常见的问题是,虽然合同按名称和所有属性匹配,但命名空间不同。消费者将尝试绑定到包含命名空间的交换。
您可以在 RMQ 管理 UI 中通过查看消费者队列及其绑定轻松验证这一点。很可能你有两个不同的交换:一个,所有发布的消息都去(并消失),另一个,消费者队列绑定到,保持空闲。
我是微服务的新手,在使用 MassTransit 框架连接发布者和订阅者时遇到了一点问题。创建用户时,我在其中一项服务中生成了一个示例集成事件。定义如下:
public sealed record UserCreatedIntegrationEvent : IntegrationEvent
{
public UserCreatedIntegrationEvent(Guid id,
string login,
string firstName,
string lastName,
string mailAddress)
: base(id,
nameof(UserCreatedIntegrationEvent))
{
Login = login;
FirstName = firstName;
LastName = lastName;
MailAddress = mailAddress;
}
public string Login { get; }
public string FirstName { get; }
public string LastName { get; }
public string MailAddress { get; }
}
消息包含在 User.Application 项目中。消息正在消息代理总线上正确发布。现在我需要在其他服务中接收一个事件,所以我需要知道那里的 UserCreatedIntegrationEvent 的定义,反序列化它等。我可以参考 User.Application 项目,但在我看来它可能会导致一些问题,一般来说我认为这种方法违反了微服务自治规则。另一种解决方案是在接收方服务中复制消息定义。然后我复制 UserCreatedIntegrationEvent 并将新粘贴的事件连接到特定的处理程序:
public abstract class IntegrationEventHandler<TIntegrationEvent> : IConsumer<TIntegrationEvent>
where TIntegrationEvent : IntegrationEvent
{
protected ConsumeContext<TIntegrationEvent> ConsumeContext { get; private set; }
public async Task Consume(ConsumeContext<TIntegrationEvent> context)
{
ConsumeContext = context;
await HandleAsync(context.Message);
}
public abstract Task HandleAsync(TIntegrationEvent @event);
}
...
public sealed class UserCreatedIntegrationEventHandler : IntegrationEventHandler<UserCreatedIntegrationEvent>
{
public override async Task HandleAsync(UserCreatedIntegrationEvent @event)
{
throw new System.NotImplementedException();
}
}
问题是,即使我复制了事件定义,所以它完全一样,但不会调用相应处理程序中的 HandleAsync 方法。但是当我尝试直接从其他服务引用 User.Application.UserCreatedIntegrationEvent 时,该方法被正确调用,但我不太喜欢这个解决方案。如何妥善解决问题?我是否应该在其他服务中复制定义(如何使用 MassTransit 连接它们),也许我应该将合同移动到其他一些包并从这两个服务中引用包?负责broker依赖注册的代码:
internal static IServiceCollection AddRabbitMQ(this IServiceCollection services,
IConfiguration configuration,
bool useHealthCheck,
Assembly consumersAssembly)
{
var settingsSection = configuration.GetSection(RabbitMQSettingsSectionKey);
var rabbitMQSettings = settingsSection.Get<RabbitMQSettings>();
services
.AddMassTransit(configurator =>
{
configurator.AddConsumers(consumersAssembly);
configurator.SetKebabCaseEndpointNameFormatter();
configurator.UsingRabbitMq((context, busFactoryConfigurator) =>
{
busFactoryConfigurator
.Host(rabbitMQSettings.HostName,
rabbitMQSettings.VirtualHostName,
hostConfigurator =>
{
hostConfigurator.Username(rabbitMQSettings.UserName);
hostConfigurator.Password(rabbitMQSettings.Password);
});
busFactoryConfigurator.ConfigureEndpoints(context);
});
})
.AddMassTransitHostedService()
.Configure<RabbitMQSettings>(settingsSection)
.AddScoped<IIntegrationEventPublisher, EventBus>();
if (useHealthCheck)
{
services
.AddHealthChecks()
.AddRabbitMQ(string.Format(RabbitMQConnectionStringPattern,
rabbitMQSettings.HostName),
name: RabbitConnectionCheckName,
tags: new[] { DefaultRabbitMQTag });
}
return services;
}
感谢您的帮助!
如果您将消息 class 从发布者复制到消费者,您需要确保消息类型相同,包括命名空间。这是文档中的 clearly highlighted。如果类型不匹配,那就可以解释为什么它没有被服务使用。
复制文件或与合同共享 NuGet 包这两种方法都可以。两者都被广泛使用。 MassTransit guidelines 不断发展消息合同以确保向后兼容性。
来自docs:
Important
MassTransit uses the full type name, including the namespace, for message contracts. When creating the same message type in two separate projects, the namespaces must match or the message will not be consumed.
常见的问题是,虽然合同按名称和所有属性匹配,但命名空间不同。消费者将尝试绑定到包含命名空间的交换。
您可以在 RMQ 管理 UI 中通过查看消费者队列及其绑定轻松验证这一点。很可能你有两个不同的交换:一个,所有发布的消息都去(并消失),另一个,消费者队列绑定到,保持空闲。