如何使用 MassTransit EndpointConvention.Map<T> 将消息类型映射到不同的 Azure ServiceBus 队列
How to map message type to a different Azure ServiceBus queue with MassTransit EndpointConvention.Map<T>
我想使用 MassTransit 将 message.Data
结构可能不同的消息发送到不同的 Azure 服务总线队列。只要 router.Name
保持初始值,它就可以正常工作。但是,只要 EndpointConvention.Map<ManyToOneTransferMessage>
的目标 Uri 发生变化,MassTransit 就会抛出异常 "The endpoint convention has already been created and can no longer be modified"。有什么方法可以将消息类型重新映射到另一个目的地,以便将 MassTransit 与多个队列一起使用?
public class AzureServiceBusManager
{
string ServiceBusConnectionString = string.Empty;
public AzureServiceBusManager()
{
ServiceBusConnectionString = ConfigurationManager.AppSettings["AppSettings:ServiceBusConnectionString"];
}
public async Task SendMessageAsyncN1(TransferMessage transferMessage, Router router)
{
var message = new ManyToOneTransferMessage
{
BlobFileName = transferMessage.BlobFileName,
Compressed = transferMessage.Compressed,
Data = transferMessage.Data,
MessageId = transferMessage.MessageId,
TransferId = transferMessage.TransferId,
TransferType = transferMessage.TransferType
};
var queueBusControl = Bus.Factory.CreateUsingAzureServiceBus(
cfg =>
{
cfg.Host(ServiceBusConnectionString);
EndpointConvention.Map<ManyToOneTransferMessage>(new Uri("queue:" + router.Name));
cfg.ReceiveEndpoint(router.Name, e =>
{
e.RequiresSession = true;
e.MaxConcurrentCalls = 500;
});
});
await queueBusControl.Send(message);
}
}
所以,首先,不要使用EndpointConvention.Map<ManyToOneTransferMessage>(new Uri("queue:" + router.Name));
。它没有用,只会增加混乱。
您可以从总线解析端点,但您必须意识到为每个调用创建总线是一个坏主意。最好在启动时启动总线(您甚至没有在上面的代码中启动它),并在应用程序关闭时停止它。
然后,对于每个调用,您可以使用该总线来解析发送端点并发送消息。
var endpoint = await bus.GetSendEndpoint(new Uri("queue:" + router.Name));
await endpoint.Send(message);
此外,您应该删除它,因为它会导致所有消息被移动到 _skipped 队列:
cfg.ReceiveEndpoint(router.Name, e =>
{
e.RequiresSession = true;
e.MaxConcurrentCalls = 500;
});
如果您需要 Session,您可能需要提前单独配置队列,尽管我没有看到您在消息上设置 SessionId,因此如果没有它可能无法正常工作。
我想使用 MassTransit 将 message.Data
结构可能不同的消息发送到不同的 Azure 服务总线队列。只要 router.Name
保持初始值,它就可以正常工作。但是,只要 EndpointConvention.Map<ManyToOneTransferMessage>
的目标 Uri 发生变化,MassTransit 就会抛出异常 "The endpoint convention has already been created and can no longer be modified"。有什么方法可以将消息类型重新映射到另一个目的地,以便将 MassTransit 与多个队列一起使用?
public class AzureServiceBusManager
{
string ServiceBusConnectionString = string.Empty;
public AzureServiceBusManager()
{
ServiceBusConnectionString = ConfigurationManager.AppSettings["AppSettings:ServiceBusConnectionString"];
}
public async Task SendMessageAsyncN1(TransferMessage transferMessage, Router router)
{
var message = new ManyToOneTransferMessage
{
BlobFileName = transferMessage.BlobFileName,
Compressed = transferMessage.Compressed,
Data = transferMessage.Data,
MessageId = transferMessage.MessageId,
TransferId = transferMessage.TransferId,
TransferType = transferMessage.TransferType
};
var queueBusControl = Bus.Factory.CreateUsingAzureServiceBus(
cfg =>
{
cfg.Host(ServiceBusConnectionString);
EndpointConvention.Map<ManyToOneTransferMessage>(new Uri("queue:" + router.Name));
cfg.ReceiveEndpoint(router.Name, e =>
{
e.RequiresSession = true;
e.MaxConcurrentCalls = 500;
});
});
await queueBusControl.Send(message);
}
}
所以,首先,不要使用EndpointConvention.Map<ManyToOneTransferMessage>(new Uri("queue:" + router.Name));
。它没有用,只会增加混乱。
您可以从总线解析端点,但您必须意识到为每个调用创建总线是一个坏主意。最好在启动时启动总线(您甚至没有在上面的代码中启动它),并在应用程序关闭时停止它。
然后,对于每个调用,您可以使用该总线来解析发送端点并发送消息。
var endpoint = await bus.GetSendEndpoint(new Uri("queue:" + router.Name));
await endpoint.Send(message);
此外,您应该删除它,因为它会导致所有消息被移动到 _skipped 队列:
cfg.ReceiveEndpoint(router.Name, e =>
{
e.RequiresSession = true;
e.MaxConcurrentCalls = 500;
});
如果您需要 Session,您可能需要提前单独配置队列,尽管我没有看到您在消息上设置 SessionId,因此如果没有它可能无法正常工作。