如何在单个服务实例中使用给定的 IJobConsumer 运行 多个作业?

How do I run multiple jobs with a given IJobConsumer within a single service instance?

我希望能够在作业使用者上同时执行多个作业。目前,如果我 运行 一个服务实例并尝试同时执行 2 个作业,则 1 个作业等待另一个完成(即等待单个作业槽可用)。

但是,如果我 运行 2 个实例通过使用 dotnet 运行 两次来创建 2 个单独的进程,我能够在两个作业 运行 同时获得所需的行为。

是否可以在一个进程中为给定的消费者同时 运行 2 个(或更多)作业?我的应用程序需要能够同时 运行 多个作业,但我没有能力部署我的应用程序的许多实例。

检查应用程序日志我看到这一行我觉得可能与它有关:

[04:13:43 DBG] Concurrent Job Limit: 1

我尝试在 instance.ConfigureJobServiceEndpoints 上将 SagaPartitionCount 更改为 1 以外的值,但无济于事。我似乎无法更改并发作业限制。

我的配置是这样的:

services.AddMassTransit(x =>
{
    x.AddDelayedMessageScheduler();
    x.SetKebabCaseEndpointNameFormatter();

    // registering the job consumer
    x.AddConsumer<DeploymentConsumer>(typeof(DeploymentConsumerDefinition));

    x.AddSagaRepository<JobSaga>()
        .EntityFrameworkRepository(r =>
        {
            r.ExistingDbContext<JobServiceSagaDbContext>();
            r.LockStatementProvider = new SqlServerLockStatementProvider();
        });
    // add other saga repositories here for JobTypeSaga and JobAttemptSaga here as well


    x.UsingRabbitMq((context, cfg) =>
    {
        var rmq = configuration.GetSection("RabbitMq").Get<RabbitMq>();
        cfg.Host(rmq.Host, rmq.Port, rmq.VirtualHost, h =>
        {
            h.Username(rmq.Username);
            h.Password(rmq.Password);
        });

        cfg.UseDelayedMessageScheduler();

        var options = new ServiceInstanceOptions()
            .SetEndpointNameFormatter(context.GetService<IEndpointNameFormatter>() ?? KebabCaseEndpointNameFormatter.Instance);

        cfg.ServiceInstance(options, instance =>
        {
            instance.ConfigureJobServiceEndpoints(js =>
            {
                js.SagaPartitionCount = 1;
                js.FinalizeCompleted = true;
                js.ConfigureSagaRepositories(context);
            });

            instance.ConfigureEndpoints(context);
        });
    });
}

DeploymentConsumerDefinition 看起来像

public class DeploymentConsumerDefinition : ConsumerDefinition<DeploymentConsumer>
{
    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
        IConsumerConfigurator<DeploymentConsumer> consumerConfigurator)
    {
        consumerConfigurator.Options<JobOptions<DeploymentConsumer>>(options =>
        {
            options.SetJobTimeout(TimeSpan.FromMinutes(20));
            options.SetConcurrentJobLimit(10);
            options.SetRetry(r =>
            {
                r.Ignore<InvalidOperationException>();
                r.Interval(5, TimeSpan.FromSeconds(10));
            });
        });
    }
}

您的定义应指定作业使用者消息类型,而不是作业使用者类型:

public class DeploymentConsumerDefinition : ConsumerDefinition<DeploymentConsumer>
{
    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
        IConsumerConfigurator<DeploymentConsumer> consumerConfigurator)
    {

                                                // MESSAGE TYPE NOT CONSUMER TYPE
        consumerConfigurator.Options<JobOptions<DeploymentCommand>>(options =>
        {
            options.SetJobTimeout(TimeSpan.FromMinutes(20));
            options.SetConcurrentJobLimit(10);
            options.SetRetry(r =>
            {
                r.Ignore<InvalidOperationException>();
                r.Interval(5, TimeSpan.FromSeconds(10));
            });
        });
    }
}