如何在不使用Send和RequestAddress的情况下,根据消息内容向不同的worker实例发布两条相同类型的消息?
How to publish two messages of the same type to different worker instances based on the message content without using Send and RequestAddress?
如何在不使用Send和RequestAddress的情况下,根据消息内容将两条相同类型的消息发布到不同的worker实例?
我的场景是:
我正在使用 Azure ServiceBus 和 Azure StorageTables。
我是运行同一个worker service workera和workerb的两个不同实例。我需要 workera 和 workerb 都根据 Command.WorkerPrefix.
的值使用 Command 类型的消息
命令类型如下所示:
{
WorkerPrefix = "a"
}
附加限制:
对于工作者服务将使用的所有消息传递实体,我想添加一个不同的前缀,无论是它的 workera 还是 workerb,所以我使用代码 cfg.SetEndpointNameFormatter(KebabCaseEndpointNameFormatter(args.[0], false))
where args.[0]是“a”或“b”
现在我只定义了一个消费命令类型消息的消费者
这给了我这样的拓扑:
Queues:
- a-command
- b-command
Topics:
- command
Subscriptions:
- a-command -> fwd a-command
- b-command -> fwd b-command
理想情况下拓扑应该是:
Queues:
- a-command
- b-command
Topics:
- a-command
Subscriptions:
- a-command -> fwd a-command
- b-command
Subscriptions:
- b-command -> fwd b-command
之所以需要这种拓扑结构,是因为 workera 没有权限创建对 workerb 主题的订阅,反之亦然。如果这不可能,我可以通过在服务启动之前创建拓扑来解决部署时的权限问题 运行,但更喜欢上面的拓扑。
现在,我想从编排器(Saga 或 Future)发布两个命令类型的请求。一个用于 workera,一个用于 workerb。
是否可以根据消息的内容配置 IPublishEndpoint,以便发布者可以检查消息,看到 WorkerPrefix 是“a”,然后发布到 a-command 主题?
将 MassTransit 与 Azure 服务总线结合使用,我建议将消息路由负担从发布者身上移开,并将其转移给消费者。通过配置接收端点并使用订阅过滤器,每个实例都会添加自己的订阅并使用消息 header 来过滤发布的消息。
在发布者上,将添加一条消息 header:
await publishEndpoint.Publish(new Command(), x => x.Headers.Set("WorkerId", "A"));
消息将发布到 Command 主题,然后通过订阅路由到接收端点。
使用订阅过滤器
要为实例 A
配置接收端点,无论是手动配置接收端点还是使用消费者定义,接收端点配置器都将配置如下所示。由于问题似乎是使用 ConfigureEndpoints,因此使用了消费者定义方法。
Note that Microsoft.Extensions.Options
is used to map configuration to the container, so that it is available when configuring the bus.
services.AddMassTransit(x =>
{
x.AddConsumer<CommandConsumer, CommandConsumerDefinition>();
x.SetEndpointNameFormatter(new KebabCaseEndpointNameFormatter("A", false));
x.UsingAzureServiceBus((context, cfg) =>
{
var hostOptions = context.GetRequiredService<IOptions<Host>>();
cfg.Host(hostOptions.Value.ConnectionString);
cfg.ConfigureEndpoints(context);
});
});
消费者定义随后将配置接收端点:
public class CommandConsumerDefinition :
ConsumerDefinition<CommandConsumer>
{
readonly string _workerId;
readonly IEndpointNameFormatter _endpointNameFormatter;
// using IOptions here, but somehow in the container the worker settings need to be resolved
// to get the workerId
public CommandConsumerDefinition(IOptions<Worker> options, IEndpointNameFormatter endpointNameFormatter)
{
_workerId = options.Value.WorkerId;
_endpointNameFormatter = endpointNameFormatter;
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<CommandConsumer> consumerConfigurator)
{
if (endpointConfigurator is IServiceBusReceiveEndpointConfigurator sb)
{
sb.ConfigureConsumeTopology = false;
var subscriptionName = _endpointNameFormatter.Consumer<CommandConsumer>();
sb.Subscribe<Command>(subscriptionName,
s => s.Filter = new SqlFilter($"WorkerId = '{_workerId}'"));
}
}
}
使用订阅过滤器,发布消息时添加的header值用于过滤消息,以便只有匹配过滤器的消息才会转发给worker queue。
生成的拓扑结构:
Queues:
- a-command
- b-command
Topics:
- command
Subscriptions:
- a-command (WorkerId = "A") -> fwd a-command
- b-command (WorkerId = "B") -> fwd b-command
为什么不直接发送?
如果您根本不想处理主题,您可以发送到 queue 并使用前缀自行格式化名称。然后,只需使用 ISendEndpointProvider(或传奇中的 .Send)到使用端点名称格式化程序结合前缀格式化的目标地址。
var destinationAddress = $"queue:{workerId}-{KebabCaseEndpointNameFormatter.Instance.Consumer<CommandConsumer>()}";
var endpoint = await sendEndpointProvider.GetSendEndpoint(new Uri(destinationAddress));
await endpoint.Send(new Command());
那么,有几个选项可能适合您。
如何在不使用Send和RequestAddress的情况下,根据消息内容将两条相同类型的消息发布到不同的worker实例?
我的场景是:
我正在使用 Azure ServiceBus 和 Azure StorageTables。
我是运行同一个worker service workera和workerb的两个不同实例。我需要 workera 和 workerb 都根据 Command.WorkerPrefix.
的值使用 Command 类型的消息命令类型如下所示:
{
WorkerPrefix = "a"
}
附加限制:
对于工作者服务将使用的所有消息传递实体,我想添加一个不同的前缀,无论是它的 workera 还是 workerb,所以我使用代码 cfg.SetEndpointNameFormatter(KebabCaseEndpointNameFormatter(args.[0], false))
where args.[0]是“a”或“b”
现在我只定义了一个消费命令类型消息的消费者
这给了我这样的拓扑:
Queues:
- a-command
- b-command
Topics:
- command
Subscriptions:
- a-command -> fwd a-command
- b-command -> fwd b-command
理想情况下拓扑应该是:
Queues:
- a-command
- b-command
Topics:
- a-command
Subscriptions:
- a-command -> fwd a-command
- b-command
Subscriptions:
- b-command -> fwd b-command
之所以需要这种拓扑结构,是因为 workera 没有权限创建对 workerb 主题的订阅,反之亦然。如果这不可能,我可以通过在服务启动之前创建拓扑来解决部署时的权限问题 运行,但更喜欢上面的拓扑。
现在,我想从编排器(Saga 或 Future)发布两个命令类型的请求。一个用于 workera,一个用于 workerb。
是否可以根据消息的内容配置 IPublishEndpoint,以便发布者可以检查消息,看到 WorkerPrefix 是“a”,然后发布到 a-command 主题?
将 MassTransit 与 Azure 服务总线结合使用,我建议将消息路由负担从发布者身上移开,并将其转移给消费者。通过配置接收端点并使用订阅过滤器,每个实例都会添加自己的订阅并使用消息 header 来过滤发布的消息。
在发布者上,将添加一条消息 header:
await publishEndpoint.Publish(new Command(), x => x.Headers.Set("WorkerId", "A"));
消息将发布到 Command 主题,然后通过订阅路由到接收端点。
使用订阅过滤器
要为实例 A
配置接收端点,无论是手动配置接收端点还是使用消费者定义,接收端点配置器都将配置如下所示。由于问题似乎是使用 ConfigureEndpoints,因此使用了消费者定义方法。
Note that
Microsoft.Extensions.Options
is used to map configuration to the container, so that it is available when configuring the bus.
services.AddMassTransit(x =>
{
x.AddConsumer<CommandConsumer, CommandConsumerDefinition>();
x.SetEndpointNameFormatter(new KebabCaseEndpointNameFormatter("A", false));
x.UsingAzureServiceBus((context, cfg) =>
{
var hostOptions = context.GetRequiredService<IOptions<Host>>();
cfg.Host(hostOptions.Value.ConnectionString);
cfg.ConfigureEndpoints(context);
});
});
消费者定义随后将配置接收端点:
public class CommandConsumerDefinition :
ConsumerDefinition<CommandConsumer>
{
readonly string _workerId;
readonly IEndpointNameFormatter _endpointNameFormatter;
// using IOptions here, but somehow in the container the worker settings need to be resolved
// to get the workerId
public CommandConsumerDefinition(IOptions<Worker> options, IEndpointNameFormatter endpointNameFormatter)
{
_workerId = options.Value.WorkerId;
_endpointNameFormatter = endpointNameFormatter;
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<CommandConsumer> consumerConfigurator)
{
if (endpointConfigurator is IServiceBusReceiveEndpointConfigurator sb)
{
sb.ConfigureConsumeTopology = false;
var subscriptionName = _endpointNameFormatter.Consumer<CommandConsumer>();
sb.Subscribe<Command>(subscriptionName,
s => s.Filter = new SqlFilter($"WorkerId = '{_workerId}'"));
}
}
}
使用订阅过滤器,发布消息时添加的header值用于过滤消息,以便只有匹配过滤器的消息才会转发给worker queue。
生成的拓扑结构:
Queues:
- a-command
- b-command
Topics:
- command
Subscriptions:
- a-command (WorkerId = "A") -> fwd a-command
- b-command (WorkerId = "B") -> fwd b-command
为什么不直接发送?
如果您根本不想处理主题,您可以发送到 queue 并使用前缀自行格式化名称。然后,只需使用 ISendEndpointProvider(或传奇中的 .Send)到使用端点名称格式化程序结合前缀格式化的目标地址。
var destinationAddress = $"queue:{workerId}-{KebabCaseEndpointNameFormatter.Instance.Consumer<CommandConsumer>()}";
var endpoint = await sendEndpointProvider.GetSendEndpoint(new Uri(destinationAddress));
await endpoint.Send(new Command());
那么,有几个选项可能适合您。