Masstransit 通过指定队列绑定在容器中添加消费者

Masstransit add consumer in container by specifying queue binding

我的消费者类型如下:

internal class ObjectAddedHandler : IConsumer<ObjectAddedIntegrationEvent>
{
    public async Task Consume(ConsumeContext<ObjectAddedIntegrationEvent> context)
    {
        var @event = context.Message;
        await HandleAsync(@event).ConfigureAwait(false);
    }
}

通过以下方式在我的容器中注册:

container.AddMassTransit(x =>
{
    x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        var host = cfg.Host(configurationProvider.RabbitMQHostName, hostConfigurator =>
        {
            hostConfigurator.Username(configurationProvider.RabbitMQUsername);
            hostConfigurator.Password(configurationProvider.RabbitMQPassword);

            hostConfigurator.UseCluster(c =>
            {
                string[] hostnames = configurationProvider.RabbitMQNodes.Split(';');
                c.ClusterMembers = hostnames;
            });
        });

        host.Settings.GetConnectionFactory().Endpoint.AddressFamily = AddressFamily.InterNetwork;

        /*HERE*/ x.AddConsumer<ObjectAddedHandler>().Endpoint(e => e.Name = "ObjectAddedHandler "+configurationProvider.TenantName);

        cfg.ConfigureEndpoints(container);
    }));

});

但是按照 documentation 我想设置一个 直接交换 以便使用路由键。在文档中找不到如何按照我的方式添加使用者,同时如何设置文档中报告的端点的绑定属性。

当我在添加消费者时尝试访问端点时,我只能修改名称、预取计数和其他几个属性,但仅此而已。但是,我想将我的端点设置为仅使用路由键 tenantName 接受消息。有什么办法吗?

编辑:

发布方:

container.AddMassTransit(x =>
{
    x.AddBus(() => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {

        cfg.Send<ObjectAddedIntegrationEvent>(routingCfg => {
            routingCfg.UseRoutingKeyFormatter(config => ConfigurationValuesProvider.Current.Get("TenantCode"));
        });
        cfg.Message<ObjectAddedIntegrationEvent>(routingCfg => routingCfg.SetEntityName("ObjectAddedIntegrationEvent"));
        cfg.Publish<ObjectAddedIntegrationEvent>(routingCfg => routingCfg.ExchangeType = ExchangeType.Direct);


        var host = cfg.Host(ConfigurationValuesProvider.Current.Get("RabbitMQHostName"), hostConfigurator =>
        {
#if !DEBUG
            hostConfigurator.Username(ConfigurationValuesProvider.Current.Get("RabbitMQUsername"));
            hostConfigurator.Password(ConfigurationValuesProvider.Current.Get("RabbitMQPassword"));

            hostConfigurator.UseCluster(c =>
            {
                string[] hostnames = ConfigurationValuesProvider.Current.Get("RabbitMQNodes").Split(';');
                c.ClusterMembers = hostnames;
            });
#endif
        });
    }));
});

接收方:

container.AddMassTransit(x =>
{
    x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        var host = cfg.Host(configurationProvider.RabbitMQHostName, hostConfigurator =>
        {
            hostConfigurator.Username(configurationProvider.RabbitMQUsername);
            hostConfigurator.Password(configurationProvider.RabbitMQPassword);

            hostConfigurator.UseCluster(c =>
            {
                string[] hostnames = configurationProvider.RabbitMQNodes.Split(';');
                c.ClusterMembers = hostnames;
            });
        });

        host.Settings.GetConnectionFactory().Endpoint.AddressFamily = AddressFamily.InterNetwork;

        x.AddConsumer<ObjectAddedHandler>().Endpoint(e => e.Name = "ObjectAddedHandler "+configurationProvider.TenantName);

        cfg.ConfigureEndpoints(container);
    }));

});



internal class ObjectAddedHandler : IConsumer<ObjectAddedIntegrationEvent>
{
    public async Task Consume(ConsumeContext<ObjectAddedIntegrationEvent> context)
    {
        var @event = context.Message;
        await HandleAsync(@event).ConfigureAwait(false);
    }
}


internal class ObjectAddedHandlerConsumerDefinition :
            ConsumerDefinition<ObjectAddedHandler>
{
    private readonly IConfigurationProvider _provider;

    public ObjectAddedHandlerConsumerDefinition(IConfigurationProvider provider)
    {
        _provider = provider;

        EndpointName = "ObjectAddedHandler" + provider.TenantName;
    }

    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
    IConsumerConfigurator<ObjectAddedHandler> consumerConfigurator)
    {
        if (endpointConfigurator is IRabbitMqReceiveEndpointConfigurator rabbit)
        {
            rabbit.BindMessageExchanges = false;

            rabbit.Bind("ObjectAddedIntegrationEvent", s =>
            {
                s.RoutingKey = _provider.TenantName;
                s.ExchangeType = ExchangeType.Direct;
            });
        }
    }
}

当消费者启动时,ObjectAddedIntegrationEvent 直接交换被正确创建。 ObjectAddedHandlerTenant 扇出交换(应该是匹配的交换)也被正确创建。 不幸的是,当我尝试 发送 来自发布方的消息并且我监视 ObjectAddedIntegrationEvent 直接交换时,我没有看到任何消息。

您可以使用消费者定义来做到这一点,消费者定义应该使用 AddConsumer<T>(typeof(definitionclass)) 添加到您的消费者中。它可以类似于这样:

public class ObjectAddedHandlerDefinition :
    ConsumerDefinition<ObjectAddedHandler>
{
    public ObjectAddedHandlerDefinition(IConfigurationProvider provider)
    {
        _provider = provider;

        EndpointName = "ObjectAddedHandler" + provider.TenantName;

        ConcurrentMessageLimit = 4;
    }

    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
        IConsumerConfigurator<ObjectAddedHandler> consumerConfigurator)
    {
        if(endpointConfigurator is IRabbitMqReceiveEndpointConfigurator rabbit)
        {
            rabbit.BindMessageExchanges = false;

            // or use Bind<T> for message type name
            rabbit.Bind("some-exchange", s => 
            {
                s.RoutingKey = _provider.TenantName;
                s.ExchangeType = ExchangeType.Direct;
            });
        }
    }
}