如何在不使用Send和RequestAddress的情况下,根据消息内容向不同的worker实例发布两条相同类型的消息?

How to publish two messages of the same type to different worker instances based on the message content without using Send and RequestAddress?

如何在不使用Send和RequestAddress的情况下,根据消息内容将两条相同类型的消息发布到不同的worker实例?

我的场景是:

我正在使用 Azure ServiceBus 和 Azure StorageTables。

我是运行同一个worker service workera和workerb的两个不同实例。我需要 workera 和 workerb 都根据 Command.WorkerPrefix.

的值使用 Command 类型的消息

命令类型如下所示:

{
    WorkerPrefix = "a"
}

附加限制:

对于工作者服务将使用的所有消息传递实体,我想添加一个不同的前缀,无论是它的 workera 还是 workerb,所以我使用代码 cfg.SetEndpointNameFormatter(KebabCaseEndpointNameFormatter(args.[0], false)) where args.[0]是“a”或“b”

现在我只定义了一个消费命令类型消息的消费者

这给了我这样的拓扑:

Queues:
- a-command
- b-command

Topics:
- command
  Subscriptions:
  - a-command -> fwd a-command
  - b-command -> fwd b-command

理想情况下拓扑应该是:

Queues:
- a-command
- b-command

Topics:
- a-command
  Subscriptions:
  - a-command -> fwd a-command
- b-command
  Subscriptions:
  - b-command -> fwd b-command

之所以需要这种拓扑结构,是因为 workera 没有权限创建对 workerb 主题的订阅,反之亦然。如果这不可能,我可以通过在服务启动之前创建拓扑来解决部署时的权限问题 运行,但更喜欢上面的拓扑。

现在,我想从编排器(Saga 或 Future)发布两个命令类型的请求。一个用于 workera,一个用于 workerb。

是否可以根据消息的内容配置 IPublishEndpoint,以便发布者可以检查消息,看到 WorkerPrefix 是“a”,然后发布到 a-command 主题?

将 MassTransit 与 Azure 服务总线结合使用,我建议将消息​​路由负担从发布者身上移开,并将其转移给消费者。通过配置接收端点并使用订阅过滤器,每个实例都会添加自己的订阅并使用消息 header 来过滤发布的消息。

在发布者上,将添加一条消息 header:

await publishEndpoint.Publish(new Command(), x => x.Headers.Set("WorkerId", "A"));

消息将发布到 Command 主题,然后通过订阅路由到接收端点。

使用订阅过滤器

要为实例 A 配置接收端点,无论是手动配置接收端点还是使用消费者定义,接收端点配置器都将配置如下所示。由于问题似乎是使用 ConfigureEndpoints,因此使用了消费者定义方法。

Note that Microsoft.Extensions.Options is used to map configuration to the container, so that it is available when configuring the bus.

services.AddMassTransit(x =>
{
    x.AddConsumer<CommandConsumer, CommandConsumerDefinition>();

    x.SetEndpointNameFormatter(new KebabCaseEndpointNameFormatter("A", false));

    x.UsingAzureServiceBus((context, cfg) =>
    {
        var hostOptions = context.GetRequiredService<IOptions<Host>>();

        cfg.Host(hostOptions.Value.ConnectionString);

        cfg.ConfigureEndpoints(context);
    });
});

消费者定义随后将配置接收端点:

public class CommandConsumerDefinition :
    ConsumerDefinition<CommandConsumer>
{
    readonly string _workerId;
    readonly IEndpointNameFormatter _endpointNameFormatter;

    // using IOptions here, but somehow in the container the worker settings need to be resolved
    // to get the workerId
    public CommandConsumerDefinition(IOptions<Worker> options, IEndpointNameFormatter endpointNameFormatter)
    {
        _workerId = options.Value.WorkerId;
        _endpointNameFormatter = endpointNameFormatter;
    }

    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
        IConsumerConfigurator<CommandConsumer> consumerConfigurator)
    {
        if (endpointConfigurator is IServiceBusReceiveEndpointConfigurator sb)
        {
            sb.ConfigureConsumeTopology = false;

            var subscriptionName = _endpointNameFormatter.Consumer<CommandConsumer>();

            sb.Subscribe<Command>(subscriptionName, 
                s => s.Filter = new SqlFilter($"WorkerId = '{_workerId}'"));
        }
    }
}

使用订阅过滤器,发布消息时添加的header值用于过滤消息,以便只有匹配过滤器的消息才会转发给worker queue。

生成的拓扑结构:

Queues:
- a-command
- b-command

Topics:
- command
  Subscriptions:
  - a-command (WorkerId = "A") -> fwd a-command
  - b-command (WorkerId = "B") -> fwd b-command

为什么不直接发送?

如果您根本不想处理主题,您可以发送到 queue 并使用前缀自行格式化名称。然后,只需使用 ISendEndpointProvider(或传奇中的 .Send)到使用端点名称格式化程序结合前缀格式化的目标地址。

var destinationAddress = $"queue:{workerId}-{KebabCaseEndpointNameFormatter.Instance.Consumer<CommandConsumer>()}";
var endpoint = await sendEndpointProvider.GetSendEndpoint(new Uri(destinationAddress));
await endpoint.Send(new Command());

那么,有几个选项可能适合您。