在总线启动后设置接收端点后设置自动删除
Set AutoDelete after when setting receive endpoint after bus has started
我在 net core 3.1 中使用 masstransit/rabbitmq。我有一个调度程序服务,它会在可用时向工作人员服务发送消息。每个 worker 服务前面都有一个 rabbitmq 队列,它是在服务启动时创建的。我想确保当工作人员服务停止时,需要删除队列(和交换)。当我在配置中设置标志 AutoDelete 时,我已经能够让它工作 (Program.cs):
services.AddMassTransit(x =>
{
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(config =>
{
config.Host(settings.RabbitMq.Host, settings.RabbitMq.Port,
settings.RabbitMq.VirtualHost, h =>
{
h.Username(settings.RabbitMq.Username);
h.Password(settings.RabbitMq.Password);
});
var queueName = AssembleQueueName(settings);
var sp = services.BuildServiceProvider();
config.ReceiveEndpoint(queueName,
e =>
{
e.Consumer(() => new MessageConsumer());
e.AutoDelete = true;
});
}));
});
不幸的是,这对我不起作用,因为我需要在我的消费者中安装 ServiceProvider class 因此我改为执行以下操作 (Worker.cs):
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var queueName = _settings.RabbitMq.ServicePrefixQueueName + "-" + _settings.ServiceId;
var messageHandler = _busControl.ConnectReceiveEndpoint(queueName, x =>
{
x.Consumer<MessageConsumer>(_serviceProvider);
});
await messageHandler.Ready;
_workerWitness.IsWorkerReady = true;
}
但是这里我不知道如何设置AutoDelete标志。有可能吗?
如果您 follow the documentation,在使用容器配置消费者时,您会看到您可以配置消费者,以便它们从容器中解析,如下所示(您的代码已更新为正确):
services.AddMassTransit(x =>
{
x.AddConsumer<MessageConsumer>();
x.UsingRabbitMq((context, config) =>
{
config.Host(settings.RabbitMq.Host, settings.RabbitMq.Port,
settings.RabbitMq.VirtualHost, h =>
{
h.Username(settings.RabbitMq.Username);
h.Password(settings.RabbitMq.Password);
});
var queueName = AssembleQueueName(settings);
config.ReceiveEndpoint(queueName, e =>
{
e.AutoDelete = true;
e.ConfigureConsumer<MessageConsumer>(context);
});
}));
});
我在 net core 3.1 中使用 masstransit/rabbitmq。我有一个调度程序服务,它会在可用时向工作人员服务发送消息。每个 worker 服务前面都有一个 rabbitmq 队列,它是在服务启动时创建的。我想确保当工作人员服务停止时,需要删除队列(和交换)。当我在配置中设置标志 AutoDelete 时,我已经能够让它工作 (Program.cs):
services.AddMassTransit(x =>
{
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(config =>
{
config.Host(settings.RabbitMq.Host, settings.RabbitMq.Port,
settings.RabbitMq.VirtualHost, h =>
{
h.Username(settings.RabbitMq.Username);
h.Password(settings.RabbitMq.Password);
});
var queueName = AssembleQueueName(settings);
var sp = services.BuildServiceProvider();
config.ReceiveEndpoint(queueName,
e =>
{
e.Consumer(() => new MessageConsumer());
e.AutoDelete = true;
});
}));
});
不幸的是,这对我不起作用,因为我需要在我的消费者中安装 ServiceProvider class 因此我改为执行以下操作 (Worker.cs):
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var queueName = _settings.RabbitMq.ServicePrefixQueueName + "-" + _settings.ServiceId;
var messageHandler = _busControl.ConnectReceiveEndpoint(queueName, x =>
{
x.Consumer<MessageConsumer>(_serviceProvider);
});
await messageHandler.Ready;
_workerWitness.IsWorkerReady = true;
}
但是这里我不知道如何设置AutoDelete标志。有可能吗?
如果您 follow the documentation,在使用容器配置消费者时,您会看到您可以配置消费者,以便它们从容器中解析,如下所示(您的代码已更新为正确):
services.AddMassTransit(x =>
{
x.AddConsumer<MessageConsumer>();
x.UsingRabbitMq((context, config) =>
{
config.Host(settings.RabbitMq.Host, settings.RabbitMq.Port,
settings.RabbitMq.VirtualHost, h =>
{
h.Username(settings.RabbitMq.Username);
h.Password(settings.RabbitMq.Password);
});
var queueName = AssembleQueueName(settings);
config.ReceiveEndpoint(queueName, e =>
{
e.AutoDelete = true;
e.ConfigureConsumer<MessageConsumer>(context);
});
}));
});