如何使用 MassTransit EndpointConvention.Map<T> 将消息类型映射到不同的 Azure ServiceBus 队列

How to map message type to a different Azure ServiceBus queue with MassTransit EndpointConvention.Map<T>

我想使用 MassTransit 将 message.Data 结构可能不同的消息发送到不同的 Azure 服务总线队列。只要 router.Name 保持初始值,它就可以正常工作。但是,只要 EndpointConvention.Map<ManyToOneTransferMessage> 的目标 Uri 发生变化,MassTransit 就会抛出异常 "The endpoint convention has already been created and can no longer be modified"。有什么方法可以将消息类型重新映射到另一个目的地,以便将 MassTransit 与多个队列一起使用?

public class AzureServiceBusManager
{
    string ServiceBusConnectionString = string.Empty;

    public AzureServiceBusManager()
    {
        ServiceBusConnectionString = ConfigurationManager.AppSettings["AppSettings:ServiceBusConnectionString"];
    }

    public async Task SendMessageAsyncN1(TransferMessage transferMessage, Router router)
    {
        var message = new ManyToOneTransferMessage
        {
            BlobFileName = transferMessage.BlobFileName,
            Compressed = transferMessage.Compressed,
            Data = transferMessage.Data,
            MessageId = transferMessage.MessageId,
            TransferId = transferMessage.TransferId,
            TransferType = transferMessage.TransferType
        };

        var queueBusControl = Bus.Factory.CreateUsingAzureServiceBus(
                    cfg =>
                    {
                        cfg.Host(ServiceBusConnectionString);
                        EndpointConvention.Map<ManyToOneTransferMessage>(new Uri("queue:" + router.Name));
                        cfg.ReceiveEndpoint(router.Name, e =>
                        {
                            e.RequiresSession = true;
                            e.MaxConcurrentCalls = 500;
                        });
                    });

        await queueBusControl.Send(message);
    }
}

所以,首先,不要使用EndpointConvention.Map<ManyToOneTransferMessage>(new Uri("queue:" + router.Name));。它没有用,只会增加混乱。

您可以从总线解析端点,但您必须意识到为每个调用创建总线是一个坏主意。最好在启动时启动总线(您甚至没有在上面的代码中启动它),并在应用程序关闭时停止它。

然后,对于每个调用,您可以使用该总线来解析发送端点并发送消息。

var endpoint = await bus.GetSendEndpoint(new Uri("queue:" + router.Name));

await endpoint.Send(message);

此外,您应该删除它,因为它会导致所有消息被移动到 _skipped 队列:

cfg.ReceiveEndpoint(router.Name, e =>
{
    e.RequiresSession = true;
    e.MaxConcurrentCalls = 500;
});

如果您需要 Session,您可能需要提前单独配置队列,尽管我没有看到您在消息上设置 SessionId,因此如果没有它可能无法正常工作。