Inmemoryoutbox 不适用于公共交通 jobconsumers jobcontext
Inmemoryoutbox not work with a masstransit jobconsumers jobcontext
我正在使用 JobConsumer select 一批要在不同进程中处理的项目。对于每个要处理的项目,都会发布一条新消息。我正在使用 inmemoryoutbox,但消息直接发布到 rabbitmq。实际上,我希望所有这些消息都收集在一个 memoryoutbox 中,并在循环成功后全部发送。
当我调试我的代码时,PublishEndpointProvider 有一个 Masstransit.Middleware.InMemoryOutbix.InMemoryOutboxPublishEndpointProvider。但是消息直接发送到 RabbitMq。
我正在使用 Masstransit 版本 8.0.1。
知道配置中缺少什么吗?
配置
services.AddMassTransit(mt =>
{
mt.AddDelayedMessageScheduler();
mt.AddConsumersFromNamespaceContaining<RunConsumer>();
mt.UsingRabbitMq((context, cfg) =>
{
cfg.UseDelayedMessageScheduler();
cfg.Host(rabbitMqConfig.Host, host =>
{
host.Username(rabbitMqConfig.Username);
host.Password(rabbitMqConfig.Password);
host.Heartbeat(rabbitMqConfig.Heartbeat);
});
cfg.UseGlobalRetryPolicy();
cfg.UseInMemoryOutbox();
var options = new ServiceInstanceOptions()
.EnableJobServiceEndpoints();
cfg.ServiceInstance(options, instance =>
{
instance.ConfigureJobServiceEndpoints(x =>
{
x.JobServiceStateEndpointName = "JobType";
x.JobServiceJobAttemptStateEndpointName = "JobAttempt";
x.JobServiceJobStateEndpointName = "Job";
});
instance.ConfigureEndpoints(context);
});
});
});
services.AddHostedService<MassTransitConsoleHostedService>();
return services;
消费者定义
public class RunConsumerDefinition :
ConsumerDefinition<RunConsumer>
{
private IBusRegistrationContext context;
public ProlongeerPeriodeConsumerDefinition(IBusRegistrationContext context)
{
this.context = context;
EndpointName = 'MyQueue';
ConcurrentMessageLimit = 1;
}
protected override void ConfigureConsumer(
IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<RunConsumer> consumerConfigurator)
{
consumerConfigurator.Options<JobOptions<RunConsumer>>(options => options
.SetJobTimeout(TimeSpan.FromMinutes(15))
.SetConcurrentJobLimit(2));
endpointConfigurator.UseMessageScope(context);
endpointConfigurator.UseInMemoryOutbox();
}
}
运行消费者
public async Task Run(JobContext<Run> context)
{
foreach (var index in Enumerable.Range(1, 7))
{
var command = new ProcessCommand();
await context.Publish<ProcessCommand>(command, context.CancellationToken);
}
}
从技术上讲,工作消费者并不 运行 作为消费者,他们 运行 与 JobContext<T>
分开。因此,他们不使用管道中的任何重试、发件箱或其他过滤器。
我正在使用 JobConsumer select 一批要在不同进程中处理的项目。对于每个要处理的项目,都会发布一条新消息。我正在使用 inmemoryoutbox,但消息直接发布到 rabbitmq。实际上,我希望所有这些消息都收集在一个 memoryoutbox 中,并在循环成功后全部发送。 当我调试我的代码时,PublishEndpointProvider 有一个 Masstransit.Middleware.InMemoryOutbix.InMemoryOutboxPublishEndpointProvider。但是消息直接发送到 RabbitMq。
我正在使用 Masstransit 版本 8.0.1。
知道配置中缺少什么吗?
配置
services.AddMassTransit(mt =>
{
mt.AddDelayedMessageScheduler();
mt.AddConsumersFromNamespaceContaining<RunConsumer>();
mt.UsingRabbitMq((context, cfg) =>
{
cfg.UseDelayedMessageScheduler();
cfg.Host(rabbitMqConfig.Host, host =>
{
host.Username(rabbitMqConfig.Username);
host.Password(rabbitMqConfig.Password);
host.Heartbeat(rabbitMqConfig.Heartbeat);
});
cfg.UseGlobalRetryPolicy();
cfg.UseInMemoryOutbox();
var options = new ServiceInstanceOptions()
.EnableJobServiceEndpoints();
cfg.ServiceInstance(options, instance =>
{
instance.ConfigureJobServiceEndpoints(x =>
{
x.JobServiceStateEndpointName = "JobType";
x.JobServiceJobAttemptStateEndpointName = "JobAttempt";
x.JobServiceJobStateEndpointName = "Job";
});
instance.ConfigureEndpoints(context);
});
});
});
services.AddHostedService<MassTransitConsoleHostedService>();
return services;
消费者定义
public class RunConsumerDefinition :
ConsumerDefinition<RunConsumer>
{
private IBusRegistrationContext context;
public ProlongeerPeriodeConsumerDefinition(IBusRegistrationContext context)
{
this.context = context;
EndpointName = 'MyQueue';
ConcurrentMessageLimit = 1;
}
protected override void ConfigureConsumer(
IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<RunConsumer> consumerConfigurator)
{
consumerConfigurator.Options<JobOptions<RunConsumer>>(options => options
.SetJobTimeout(TimeSpan.FromMinutes(15))
.SetConcurrentJobLimit(2));
endpointConfigurator.UseMessageScope(context);
endpointConfigurator.UseInMemoryOutbox();
}
}
运行消费者
public async Task Run(JobContext<Run> context)
{
foreach (var index in Enumerable.Range(1, 7))
{
var command = new ProcessCommand();
await context.Publish<ProcessCommand>(command, context.CancellationToken);
}
}
从技术上讲,工作消费者并不 运行 作为消费者,他们 运行 与 JobContext<T>
分开。因此,他们不使用管道中的任何重试、发件箱或其他过滤器。