Inmemoryoutbox 不适用于公共交通 jobconsumers jobcontext

Inmemoryoutbox not work with a masstransit jobconsumers jobcontext

我正在使用 JobConsumer select 一批要在不同进程中处理的项目。对于每个要处理的项目,都会发布一条新消息。我正在使用 inmemoryoutbox,但消息直接发布到 rabbitmq。实际上,我希望所有这些消息都收集在一个 memoryoutbox 中,并在循环成功后全部发送。 当我调试我的代码时,PublishEndpointProvider 有一个 Masstransit.Middleware.InMemoryOutbix.InMemoryOutboxPublishEndpointProvider。但是消息直接发送到 RabbitMq。

我正在使用 Masstransit 版本 8.0.1。

知道配置中缺少什么吗?

配置

services.AddMassTransit(mt =>
{
  mt.AddDelayedMessageScheduler();
  mt.AddConsumersFromNamespaceContaining<RunConsumer>();

  mt.UsingRabbitMq((context, cfg) =>
  {
    cfg.UseDelayedMessageScheduler();
    cfg.Host(rabbitMqConfig.Host, host =>
    {
      host.Username(rabbitMqConfig.Username);
      host.Password(rabbitMqConfig.Password);
      host.Heartbeat(rabbitMqConfig.Heartbeat);
    });

    cfg.UseGlobalRetryPolicy();
    cfg.UseInMemoryOutbox();
    var options = new ServiceInstanceOptions()
                     .EnableJobServiceEndpoints();
    cfg.ServiceInstance(options, instance =>
    {
      instance.ConfigureJobServiceEndpoints(x =>
      {
        x.JobServiceStateEndpointName = "JobType";
        x.JobServiceJobAttemptStateEndpointName = "JobAttempt";
        x.JobServiceJobStateEndpointName = "Job";
      });
      instance.ConfigureEndpoints(context);
    });
  });
});
services.AddHostedService<MassTransitConsoleHostedService>();
return services;

消费者定义

public class RunConsumerDefinition :
        ConsumerDefinition<RunConsumer>
{
  private IBusRegistrationContext context;

  public ProlongeerPeriodeConsumerDefinition(IBusRegistrationContext context)
  {
    this.context = context;
    EndpointName = 'MyQueue';
    ConcurrentMessageLimit = 1;
  }

  protected override void ConfigureConsumer(
    IReceiveEndpointConfigurator endpointConfigurator, 
    IConsumerConfigurator<RunConsumer> consumerConfigurator)
    {
      consumerConfigurator.Options<JobOptions<RunConsumer>>(options => options
                .SetJobTimeout(TimeSpan.FromMinutes(15))
                .SetConcurrentJobLimit(2));
            endpointConfigurator.UseMessageScope(context);
            endpointConfigurator.UseInMemoryOutbox();
    }
  }

运行消费者

public async Task Run(JobContext<Run> context)
{
  foreach (var index in Enumerable.Range(1, 7))
  {
    var command = new ProcessCommand();
    await context.Publish<ProcessCommand>(command, context.CancellationToken);
  }
}

从技术上讲,工作消费者并不 运行 作为消费者,他们 运行 与 JobContext<T> 分开。因此,他们不使用管道中的任何重试、发件箱或其他过滤器。