使用 MassTransit 发布到 Azure 服务总线导致错误
Publish to Azure Service Bus using MassTransit is causing errors
我是 MassTransit 和 Azure 服务总线的新手。我正在尝试使用在 .NET Core 3.1 API 中使用 RabbitMq 或 Azure 服务总线的架构。我让 RabbitMq 部分正常工作,并且刚刚开始使用 Azure 服务总线。我有一个 API 将接收传入的有效负载并将其发布到队列。当我尝试通过 Azure 服务总线方法发布时,出现错误 "SubCode=40000。无法对类型主题进行操作,因为命名空间 'servicehubqa' 正在使用 'Basic' 层。
我正在尝试使用队列方法,并希望在发布消息时创建队列。目前,服务总线使用基本定价层,因为文档说我可以在该级别使用队列。我不确定是否需要手动创建队列(我必须使用 RabbitMq 执行此方法,因为如果不存在消费者则不会创建队列)。如果未指定任何内容,主题是默认方法吗?如何指定队列与主题?
我的代码如下
启动 - 配置服务
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton(Configuration);
services.AddScoped<IMassTransitRabbitMqTransport, MassTransitRabbitMqTransport>();
services.AddScoped<IMassTransitAzureServiceBusTransport, MassTransitAzureServiceBusTransport>();
var messageProvider = ConfigProvider.GetConfig("MessageService", "Messaging_Service");
switch (messageProvider)
{
case "AzureServiceBus":
services.AddScoped<IMessagingService, MassTransitAzureServiceBusMessagingService>();
break;
case "RabbitMq":
services.AddScoped<IMessagingService, MassTransitRabbitMqMessagingService>();
break;
default:
throw new ArgumentException("Invalid message service");
};
services.AddControllers();
}
控制器
public class ListenerController : ControllerBase
{
readonly ILogger<ListenerController> logger;
readonly IMessagingService messenger;
public ListenerController(
ILogger<ListenerController> logger,
IMessagingService messenger)
{
this.logger = logger;
this.messenger = messenger;
}
[HttpPost]
public async Task<IActionResult> Post()
{
var payload = new
{
...
};
await messenger.Publish(payload);
return Ok();
}
}
IMessagingService
public interface IMessagingService
{
Task Publish(object payload);
}
IMassTransitTransport
public interface IMassTransitTransport
{
IBusControl BusControl { get; }
}
public interface IMassTransitRabbitMqTransport : IMassTransitTransport { }
public interface IMassTransitAzureServiceBusTransport : IMassTransitTransport { }
MassTransitAzureServiceBusTransport
public sealed class MassTransitAzureServiceBusTransport : IMassTransitAzureServiceBusTransport
{
public IBusControl BusControl { get; }
public MassTransitAzureServiceBusTransport()
{
BusControl = ConfigureBus();
BusControl.StartAsync();
}
IBusControl ConfigureBus()
{
return Bus.Factory.CreateUsingAzureServiceBus(config => {
var host = config.Host(ConfigProvider.GetConfig("AzureServiceBus", "AzureServiceBus_ConnStr"), host => { });
});
}
}
MassTransitAzureServiceBusMessagingService
public class MassTransitAzureServiceBusMessagingService : IMessagingService
{
readonly IMassTransitAzureServiceBusTransport massTransitTransport;
public MassTransitAzureServiceBusMessagingService(IMassTransitAzureServiceBusTransport massTransitTransport)
{
//transport bus config already happens in massTransitTransport constructor
this.massTransitTransport = massTransitTransport;
}
public async Task Publish(object payload)
{
var jsn = Newtonsoft.Json.JsonConvert.SerializeObject(payload);
var cmd = JObject.Parse(jsn)["Command"];
switch (cmd.ToString())
{
case "UPDATESTATUS":
//IRegisterCommandUpdateStatus is an interface specifying the properties needed
await massTransitTransport.BusControl.Publish<IRegisterCommandUpdateStatus>(payload);
break;
default: break;
}
}
}
Azure 服务总线基本层不允许使用主题。所以你将无法使用发布。也就是说,MassTransit 并不真正适用于基本层,尽管过去的尝试可能已经成功。
MassTransit 文档确实指出,如果您想使用一个主题(即同时发布到多个订阅的能力),您可以使用发布。
如果要将消息发送到队列(消息路由到特定位置),请使用发送并提供正确的信息。
主题需要标准定价,队列可以使用基本定价。
根据此信息,MassTransitAzureServiceBusMessagingService 将修改如下:
基本定价 - 队列
public async Task Publish(object payload)
{
var jsn = Newtonsoft.Json.JsonConvert.SerializeObject(payload);
var cmd = JObject.Parse(jsn)["Command"];
switch (cmd.ToString())
{
case "UPDATESTATUS":
var queueUri = new Uri(massTransitTransport.BusControl.Address, "registration.updatestatus");
var endpoint = await massTransitTransport.BusControl.GetSendEndpoint(queueUri);
await endpoint.Send<IRegisterCommandUpdateStatus>(payload);
break;
default: break;
}
}
标准定价 - Topics/Subscriptions
public async Task Publish(object payload)
{
var jsn = Newtonsoft.Json.JsonConvert.SerializeObject(payload);
var cmd = JObject.Parse(jsn)["Command"];
switch (cmd.ToString())
{
case "UPDATESTATUS":
await massTransitTransport.BusControl.Publish<IRegisterCommandUpdateStatus>(payload);
break;
default: break;
}
}
我是 MassTransit 和 Azure 服务总线的新手。我正在尝试使用在 .NET Core 3.1 API 中使用 RabbitMq 或 Azure 服务总线的架构。我让 RabbitMq 部分正常工作,并且刚刚开始使用 Azure 服务总线。我有一个 API 将接收传入的有效负载并将其发布到队列。当我尝试通过 Azure 服务总线方法发布时,出现错误 "SubCode=40000。无法对类型主题进行操作,因为命名空间 'servicehubqa' 正在使用 'Basic' 层。
我正在尝试使用队列方法,并希望在发布消息时创建队列。目前,服务总线使用基本定价层,因为文档说我可以在该级别使用队列。我不确定是否需要手动创建队列(我必须使用 RabbitMq 执行此方法,因为如果不存在消费者则不会创建队列)。如果未指定任何内容,主题是默认方法吗?如何指定队列与主题?
我的代码如下
启动 - 配置服务
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton(Configuration);
services.AddScoped<IMassTransitRabbitMqTransport, MassTransitRabbitMqTransport>();
services.AddScoped<IMassTransitAzureServiceBusTransport, MassTransitAzureServiceBusTransport>();
var messageProvider = ConfigProvider.GetConfig("MessageService", "Messaging_Service");
switch (messageProvider)
{
case "AzureServiceBus":
services.AddScoped<IMessagingService, MassTransitAzureServiceBusMessagingService>();
break;
case "RabbitMq":
services.AddScoped<IMessagingService, MassTransitRabbitMqMessagingService>();
break;
default:
throw new ArgumentException("Invalid message service");
};
services.AddControllers();
}
控制器
public class ListenerController : ControllerBase
{
readonly ILogger<ListenerController> logger;
readonly IMessagingService messenger;
public ListenerController(
ILogger<ListenerController> logger,
IMessagingService messenger)
{
this.logger = logger;
this.messenger = messenger;
}
[HttpPost]
public async Task<IActionResult> Post()
{
var payload = new
{
...
};
await messenger.Publish(payload);
return Ok();
}
}
IMessagingService
public interface IMessagingService
{
Task Publish(object payload);
}
IMassTransitTransport
public interface IMassTransitTransport
{
IBusControl BusControl { get; }
}
public interface IMassTransitRabbitMqTransport : IMassTransitTransport { }
public interface IMassTransitAzureServiceBusTransport : IMassTransitTransport { }
MassTransitAzureServiceBusTransport
public sealed class MassTransitAzureServiceBusTransport : IMassTransitAzureServiceBusTransport
{
public IBusControl BusControl { get; }
public MassTransitAzureServiceBusTransport()
{
BusControl = ConfigureBus();
BusControl.StartAsync();
}
IBusControl ConfigureBus()
{
return Bus.Factory.CreateUsingAzureServiceBus(config => {
var host = config.Host(ConfigProvider.GetConfig("AzureServiceBus", "AzureServiceBus_ConnStr"), host => { });
});
}
}
MassTransitAzureServiceBusMessagingService
public class MassTransitAzureServiceBusMessagingService : IMessagingService
{
readonly IMassTransitAzureServiceBusTransport massTransitTransport;
public MassTransitAzureServiceBusMessagingService(IMassTransitAzureServiceBusTransport massTransitTransport)
{
//transport bus config already happens in massTransitTransport constructor
this.massTransitTransport = massTransitTransport;
}
public async Task Publish(object payload)
{
var jsn = Newtonsoft.Json.JsonConvert.SerializeObject(payload);
var cmd = JObject.Parse(jsn)["Command"];
switch (cmd.ToString())
{
case "UPDATESTATUS":
//IRegisterCommandUpdateStatus is an interface specifying the properties needed
await massTransitTransport.BusControl.Publish<IRegisterCommandUpdateStatus>(payload);
break;
default: break;
}
}
}
Azure 服务总线基本层不允许使用主题。所以你将无法使用发布。也就是说,MassTransit 并不真正适用于基本层,尽管过去的尝试可能已经成功。
MassTransit 文档确实指出,如果您想使用一个主题(即同时发布到多个订阅的能力),您可以使用发布。 如果要将消息发送到队列(消息路由到特定位置),请使用发送并提供正确的信息。
主题需要标准定价,队列可以使用基本定价。
根据此信息,MassTransitAzureServiceBusMessagingService 将修改如下:
基本定价 - 队列
public async Task Publish(object payload)
{
var jsn = Newtonsoft.Json.JsonConvert.SerializeObject(payload);
var cmd = JObject.Parse(jsn)["Command"];
switch (cmd.ToString())
{
case "UPDATESTATUS":
var queueUri = new Uri(massTransitTransport.BusControl.Address, "registration.updatestatus");
var endpoint = await massTransitTransport.BusControl.GetSendEndpoint(queueUri);
await endpoint.Send<IRegisterCommandUpdateStatus>(payload);
break;
default: break;
}
}
标准定价 - Topics/Subscriptions
public async Task Publish(object payload)
{
var jsn = Newtonsoft.Json.JsonConvert.SerializeObject(payload);
var cmd = JObject.Parse(jsn)["Command"];
switch (cmd.ToString())
{
case "UPDATESTATUS":
await massTransitTransport.BusControl.Publish<IRegisterCommandUpdateStatus>(payload);
break;
default: break;
}
}