使用 MassTransit 发布到 Azure 服务总线导致错误

Publish to Azure Service Bus using MassTransit is causing errors

我是 MassTransit 和 Azure 服务总线的新手。我正在尝试使用在 .NET Core 3.1 API 中使用 RabbitMq 或 Azure 服务总线的架构。我让 RabbitMq 部分正常工作,并且刚刚开始使用 Azure 服务总线。我有一个 API 将接收传入的有效负载并将其发布到队列。当我尝试通过 Azure 服务总线方法发布时,出现错误 "SubCode=40000。无法对类型主题进行操作,因为命名空间 'servicehubqa' 正在使用 'Basic' 层。

我正在尝试使用队列方法,并希望在发布消息时创建队列。目前,服务总线使用基本定价层,因为文档说我可以在该级别使用队列。我不确定是否需要手动创建队列(我必须使用 RabbitMq 执行此方法,因为如果不存在消费者则不会创建队列)。如果未指定任何内容,主题是默认方法吗?如何指定队列与主题?

我的代码如下

启动 - 配置服务

    public void ConfigureServices(IServiceCollection services)
    {
        services.AddSingleton(Configuration);

        services.AddScoped<IMassTransitRabbitMqTransport, MassTransitRabbitMqTransport>();
        services.AddScoped<IMassTransitAzureServiceBusTransport, MassTransitAzureServiceBusTransport>();

        var messageProvider = ConfigProvider.GetConfig("MessageService", "Messaging_Service");
        switch (messageProvider)
        {
            case "AzureServiceBus":
                services.AddScoped<IMessagingService, MassTransitAzureServiceBusMessagingService>();
                break;
            case "RabbitMq":
                services.AddScoped<IMessagingService, MassTransitRabbitMqMessagingService>();
                break;
            default:
                throw new ArgumentException("Invalid message service");
        };

        services.AddControllers();
    }

控制器

public class ListenerController : ControllerBase
{
    readonly ILogger<ListenerController> logger;
    readonly IMessagingService messenger;

    public ListenerController(
        ILogger<ListenerController> logger,
        IMessagingService messenger)
    {
        this.logger = logger;
        this.messenger = messenger;
    }

    [HttpPost]
    public async Task<IActionResult> Post()
    {
        var payload = new
        {
            ...
        };

        await messenger.Publish(payload);

        return Ok();
    }
}

IMessagingService

public interface IMessagingService
{
    Task Publish(object payload);
}

IMassTransitTransport

public interface IMassTransitTransport
{
    IBusControl BusControl { get; }
}

public interface IMassTransitRabbitMqTransport : IMassTransitTransport { }

public interface IMassTransitAzureServiceBusTransport : IMassTransitTransport { }

MassTransitAzureServiceBusTransport

public sealed class MassTransitAzureServiceBusTransport : IMassTransitAzureServiceBusTransport
{
    public IBusControl BusControl { get; }

    public MassTransitAzureServiceBusTransport()
    {
        BusControl = ConfigureBus();
        BusControl.StartAsync();
    }

    IBusControl ConfigureBus()
    {
        return Bus.Factory.CreateUsingAzureServiceBus(config => {
            var host = config.Host(ConfigProvider.GetConfig("AzureServiceBus", "AzureServiceBus_ConnStr"), host => { });
        });
    }
}

MassTransitAzureServiceBusMessagingService

public class MassTransitAzureServiceBusMessagingService : IMessagingService
{
    readonly IMassTransitAzureServiceBusTransport massTransitTransport;

    public MassTransitAzureServiceBusMessagingService(IMassTransitAzureServiceBusTransport massTransitTransport)
    {
        //transport bus config already happens in massTransitTransport constructor
        this.massTransitTransport = massTransitTransport;
    }

    public async Task Publish(object payload)
    {
        var jsn = Newtonsoft.Json.JsonConvert.SerializeObject(payload);
        var cmd = JObject.Parse(jsn)["Command"];

        switch (cmd.ToString())
        {
            case "UPDATESTATUS":
                //IRegisterCommandUpdateStatus is an interface specifying the properties needed
                await massTransitTransport.BusControl.Publish<IRegisterCommandUpdateStatus>(payload);
                break;
            default: break;
        }
    }
}

Azure 服务总线基本层不允许使用主题。所以你将无法使用发布。也就是说,MassTransit 并不真正适用于基本层,尽管过去的尝试可能已经成功。

MassTransit 文档确实指出,如果您想使用一个主题(即同时发布到多个订阅的能力),您可以使用发布。 如果要将消息发送到队列(消息路由到特定位置),请使用发送并提供正确的信息。

主题需要标准定价,队列可以使用基本定价。

根据此信息,MassTransitAzureServiceBusMessagingService 将修改如下:

基本定价 - 队列

    public async Task Publish(object payload)
    {
        var jsn = Newtonsoft.Json.JsonConvert.SerializeObject(payload);
        var cmd = JObject.Parse(jsn)["Command"];

        switch (cmd.ToString())
        {
            case "UPDATESTATUS":
                var queueUri = new Uri(massTransitTransport.BusControl.Address, "registration.updatestatus");
                var endpoint = await massTransitTransport.BusControl.GetSendEndpoint(queueUri);
                await endpoint.Send<IRegisterCommandUpdateStatus>(payload);
                break;
            default: break;
        }
    }

标准定价 - Topics/Subscriptions

    public async Task Publish(object payload)
    {
        var jsn = Newtonsoft.Json.JsonConvert.SerializeObject(payload);
        var cmd = JObject.Parse(jsn)["Command"];

        switch (cmd.ToString())
        {
            case "UPDATESTATUS":
                await massTransitTransport.BusControl.Publish<IRegisterCommandUpdateStatus>(payload);
                break;
            default: break;
        }
    }