MassTransit 未订阅 AzureServiceBus 主题
MassTransit not subscribing to AzureServiceBus Topic
我目前正在尝试使用 MassTransit 6.3.2 更新最初是 .NET Core 3.1 的应用程序。现在配置为使用 .NET 6.0 和 MassTransit 7.3.0
我们的应用程序使用 MassTransit 通过 Azure 服务总线发送消息,将消息发布到主题,然后让其他订阅者收听这些主题。
删减,是这样实现的:
// Program.cs
services.AddMassTransit(config =>
{
config.AddConsumer<AppointmentBookedMessageConsumer>();
config.AddBus(BusControlFactory.ConfigureAzureServiceBus);
});
// BusControlFactory.cs
public static class BusControlFactory
{
public static IBusControl ConfigureAzureServiceBus(IRegistrationContext<IServiceProvider> context)
{
var config = context.Container.GetService<AppConfiguration>();
var azureServiceBus = Bus.Factory.CreateUsingAzureServiceBus(busFactoryConfig =>
{
busFactoryConfig.Host("Endpoint=sb://REDACTED-queues.servicebus.windows.net/;SharedAccessKeyName=MyMessageQueuing;SharedAccessKey=MyKeyGoesHere");
busFactoryConfig.Message<AppointmentBookedMessage>(m => m.SetEntityName("appointment-booked"));
busFactoryConfig.SubscriptionEndpoint<AppointmentBookedMessage>(
"my-subscriber-name",
configurator =>
{
configurator.UseMessageRetry(r => r.Interval(5, TimeSpan.FromSeconds(60)));
configurator.Consumer<AppointmentBookedMessageConsumer>(context.Container);
});
return azureServiceBus;
}
}
}
它现在已经更改并升级到最新的 MassTransit 并且实现如下:
// Program.cs
services.AddMassTransit(config =>
{
config.AddConsumer<AppointmentBookedMessageConsumer, AppointmentBookedMessageConsumerDefinition>();
config.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host("Endpoint=sb://REDACTED-queues.servicebus.windows.net/;SharedAccessKeyName=MyMessageQueuing;SharedAccessKey=MyKeyGoesHere");
cfg.Message<AppointmentBookedMessage>(m => m.SetEntityName("appointment-booked"));
cfg.ConfigureEndpoints(context);
});
// AppointmentBookedMessageConsumerDefinition.cs
public class AppointmentBookedMessageConsumerDefinition: ConsumerDefinition<AppointmentBookedMessageConsumer>
{
public AppointmentBookedMessageConsumerDefinition()
{
EndpointName = "testharness.subscriber";
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<AppointmentBookedMessageConsumer> consumerConfigurator)
{
endpointConfigurator.UseMessageRetry(r => r.Interval(5, TimeSpan.FromSeconds(60)));
}
}
问题如果可以算一个的话,就是我无法绑定到已经存在的订阅。
在上面的示例中,您可以看到 EndpointName
设置为“testharness.subscriber”。在我升级之前,已经订阅了主题“预约”。但是,当应用程序运行时,它不会出错,但不会收到任何消息。
如果我将 EndpointName
更改为“testharness.subscriber2”。另一个订阅者出现在 Azure 服务总线主题中(通过 Azure 门户),我开始接收消息。我看不出名称有什么不同(除了我放置的更改,在这种情况下:“2”后缀)。
我是不是漏掉了什么?我还需要做些什么来绑定它们吗?我的配置错了吗?错了吗?虽然我确定我可以通过更密切地管理发布并在他们使用新队列后删除不需要的队列来解决这个问题 - 但感觉这是错误的方法。
使用 Azure 服务总线,ForwardTo
订阅可能有点不透明。
虽然订阅可能确实在视觉上表明它正在转发到正确的命名队列,但可能队列已被删除并在某个时候重新创建而不删除订阅。这会导致订阅建立消息,因为它无法将消息转发到不再存在的队列。
为什么?在内部,订阅将 ForwardTo
作为 object id 维护,在队列被删除后指向一个不存在的对象——导致消息在订阅中累积.
如果您的订阅中有消息,您可能需要进入门户并更新该订阅以指向新队列(即使它具有相同的名称) ,此时消息 应该 流向队列。
如果订阅中没有任何消息(或者它们不重要),您可以删除订阅,MassTransit 会在您重新启动公交车时重新创建它。
我目前正在尝试使用 MassTransit 6.3.2 更新最初是 .NET Core 3.1 的应用程序。现在配置为使用 .NET 6.0 和 MassTransit 7.3.0
我们的应用程序使用 MassTransit 通过 Azure 服务总线发送消息,将消息发布到主题,然后让其他订阅者收听这些主题。
删减,是这样实现的:
// Program.cs
services.AddMassTransit(config =>
{
config.AddConsumer<AppointmentBookedMessageConsumer>();
config.AddBus(BusControlFactory.ConfigureAzureServiceBus);
});
// BusControlFactory.cs
public static class BusControlFactory
{
public static IBusControl ConfigureAzureServiceBus(IRegistrationContext<IServiceProvider> context)
{
var config = context.Container.GetService<AppConfiguration>();
var azureServiceBus = Bus.Factory.CreateUsingAzureServiceBus(busFactoryConfig =>
{
busFactoryConfig.Host("Endpoint=sb://REDACTED-queues.servicebus.windows.net/;SharedAccessKeyName=MyMessageQueuing;SharedAccessKey=MyKeyGoesHere");
busFactoryConfig.Message<AppointmentBookedMessage>(m => m.SetEntityName("appointment-booked"));
busFactoryConfig.SubscriptionEndpoint<AppointmentBookedMessage>(
"my-subscriber-name",
configurator =>
{
configurator.UseMessageRetry(r => r.Interval(5, TimeSpan.FromSeconds(60)));
configurator.Consumer<AppointmentBookedMessageConsumer>(context.Container);
});
return azureServiceBus;
}
}
}
它现在已经更改并升级到最新的 MassTransit 并且实现如下:
// Program.cs
services.AddMassTransit(config =>
{
config.AddConsumer<AppointmentBookedMessageConsumer, AppointmentBookedMessageConsumerDefinition>();
config.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host("Endpoint=sb://REDACTED-queues.servicebus.windows.net/;SharedAccessKeyName=MyMessageQueuing;SharedAccessKey=MyKeyGoesHere");
cfg.Message<AppointmentBookedMessage>(m => m.SetEntityName("appointment-booked"));
cfg.ConfigureEndpoints(context);
});
// AppointmentBookedMessageConsumerDefinition.cs
public class AppointmentBookedMessageConsumerDefinition: ConsumerDefinition<AppointmentBookedMessageConsumer>
{
public AppointmentBookedMessageConsumerDefinition()
{
EndpointName = "testharness.subscriber";
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<AppointmentBookedMessageConsumer> consumerConfigurator)
{
endpointConfigurator.UseMessageRetry(r => r.Interval(5, TimeSpan.FromSeconds(60)));
}
}
问题如果可以算一个的话,就是我无法绑定到已经存在的订阅。
在上面的示例中,您可以看到 EndpointName
设置为“testharness.subscriber”。在我升级之前,已经订阅了主题“预约”。但是,当应用程序运行时,它不会出错,但不会收到任何消息。
如果我将 EndpointName
更改为“testharness.subscriber2”。另一个订阅者出现在 Azure 服务总线主题中(通过 Azure 门户),我开始接收消息。我看不出名称有什么不同(除了我放置的更改,在这种情况下:“2”后缀)。
我是不是漏掉了什么?我还需要做些什么来绑定它们吗?我的配置错了吗?错了吗?虽然我确定我可以通过更密切地管理发布并在他们使用新队列后删除不需要的队列来解决这个问题 - 但感觉这是错误的方法。
使用 Azure 服务总线,ForwardTo
订阅可能有点不透明。
虽然订阅可能确实在视觉上表明它正在转发到正确的命名队列,但可能队列已被删除并在某个时候重新创建而不删除订阅。这会导致订阅建立消息,因为它无法将消息转发到不再存在的队列。
为什么?在内部,订阅将 ForwardTo
作为 object id 维护,在队列被删除后指向一个不存在的对象——导致消息在订阅中累积.
如果您的订阅中有消息,您可能需要进入门户并更新该订阅以指向新队列(即使它具有相同的名称) ,此时消息 应该 流向队列。
如果订阅中没有任何消息(或者它们不重要),您可以删除订阅,MassTransit 会在您重新启动公交车时重新创建它。