如何在单个服务实例中使用给定的 IJobConsumer 运行 多个作业?
How do I run multiple jobs with a given IJobConsumer within a single service instance?
我希望能够在作业使用者上同时执行多个作业。目前,如果我 运行 一个服务实例并尝试同时执行 2 个作业,则 1 个作业等待另一个完成(即等待单个作业槽可用)。
但是,如果我 运行 2 个实例通过使用 dotnet 运行 两次来创建 2 个单独的进程,我能够在两个作业 运行 同时获得所需的行为。
是否可以在一个进程中为给定的消费者同时 运行 2 个(或更多)作业?我的应用程序需要能够同时 运行 多个作业,但我没有能力部署我的应用程序的许多实例。
检查应用程序日志我看到这一行我觉得可能与它有关:
[04:13:43 DBG] Concurrent Job Limit: 1
我尝试在 instance.ConfigureJobServiceEndpoints
上将 SagaPartitionCount
更改为 1 以外的值,但无济于事。我似乎无法更改并发作业限制。
我的配置是这样的:
services.AddMassTransit(x =>
{
x.AddDelayedMessageScheduler();
x.SetKebabCaseEndpointNameFormatter();
// registering the job consumer
x.AddConsumer<DeploymentConsumer>(typeof(DeploymentConsumerDefinition));
x.AddSagaRepository<JobSaga>()
.EntityFrameworkRepository(r =>
{
r.ExistingDbContext<JobServiceSagaDbContext>();
r.LockStatementProvider = new SqlServerLockStatementProvider();
});
// add other saga repositories here for JobTypeSaga and JobAttemptSaga here as well
x.UsingRabbitMq((context, cfg) =>
{
var rmq = configuration.GetSection("RabbitMq").Get<RabbitMq>();
cfg.Host(rmq.Host, rmq.Port, rmq.VirtualHost, h =>
{
h.Username(rmq.Username);
h.Password(rmq.Password);
});
cfg.UseDelayedMessageScheduler();
var options = new ServiceInstanceOptions()
.SetEndpointNameFormatter(context.GetService<IEndpointNameFormatter>() ?? KebabCaseEndpointNameFormatter.Instance);
cfg.ServiceInstance(options, instance =>
{
instance.ConfigureJobServiceEndpoints(js =>
{
js.SagaPartitionCount = 1;
js.FinalizeCompleted = true;
js.ConfigureSagaRepositories(context);
});
instance.ConfigureEndpoints(context);
});
});
}
DeploymentConsumerDefinition
看起来像
public class DeploymentConsumerDefinition : ConsumerDefinition<DeploymentConsumer>
{
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<DeploymentConsumer> consumerConfigurator)
{
consumerConfigurator.Options<JobOptions<DeploymentConsumer>>(options =>
{
options.SetJobTimeout(TimeSpan.FromMinutes(20));
options.SetConcurrentJobLimit(10);
options.SetRetry(r =>
{
r.Ignore<InvalidOperationException>();
r.Interval(5, TimeSpan.FromSeconds(10));
});
});
}
}
您的定义应指定作业使用者消息类型,而不是作业使用者类型:
public class DeploymentConsumerDefinition : ConsumerDefinition<DeploymentConsumer>
{
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<DeploymentConsumer> consumerConfigurator)
{
// MESSAGE TYPE NOT CONSUMER TYPE
consumerConfigurator.Options<JobOptions<DeploymentCommand>>(options =>
{
options.SetJobTimeout(TimeSpan.FromMinutes(20));
options.SetConcurrentJobLimit(10);
options.SetRetry(r =>
{
r.Ignore<InvalidOperationException>();
r.Interval(5, TimeSpan.FromSeconds(10));
});
});
}
}
我希望能够在作业使用者上同时执行多个作业。目前,如果我 运行 一个服务实例并尝试同时执行 2 个作业,则 1 个作业等待另一个完成(即等待单个作业槽可用)。
但是,如果我 运行 2 个实例通过使用 dotnet 运行 两次来创建 2 个单独的进程,我能够在两个作业 运行 同时获得所需的行为。
是否可以在一个进程中为给定的消费者同时 运行 2 个(或更多)作业?我的应用程序需要能够同时 运行 多个作业,但我没有能力部署我的应用程序的许多实例。
检查应用程序日志我看到这一行我觉得可能与它有关:
[04:13:43 DBG] Concurrent Job Limit: 1
我尝试在 instance.ConfigureJobServiceEndpoints
上将 SagaPartitionCount
更改为 1 以外的值,但无济于事。我似乎无法更改并发作业限制。
我的配置是这样的:
services.AddMassTransit(x =>
{
x.AddDelayedMessageScheduler();
x.SetKebabCaseEndpointNameFormatter();
// registering the job consumer
x.AddConsumer<DeploymentConsumer>(typeof(DeploymentConsumerDefinition));
x.AddSagaRepository<JobSaga>()
.EntityFrameworkRepository(r =>
{
r.ExistingDbContext<JobServiceSagaDbContext>();
r.LockStatementProvider = new SqlServerLockStatementProvider();
});
// add other saga repositories here for JobTypeSaga and JobAttemptSaga here as well
x.UsingRabbitMq((context, cfg) =>
{
var rmq = configuration.GetSection("RabbitMq").Get<RabbitMq>();
cfg.Host(rmq.Host, rmq.Port, rmq.VirtualHost, h =>
{
h.Username(rmq.Username);
h.Password(rmq.Password);
});
cfg.UseDelayedMessageScheduler();
var options = new ServiceInstanceOptions()
.SetEndpointNameFormatter(context.GetService<IEndpointNameFormatter>() ?? KebabCaseEndpointNameFormatter.Instance);
cfg.ServiceInstance(options, instance =>
{
instance.ConfigureJobServiceEndpoints(js =>
{
js.SagaPartitionCount = 1;
js.FinalizeCompleted = true;
js.ConfigureSagaRepositories(context);
});
instance.ConfigureEndpoints(context);
});
});
}
DeploymentConsumerDefinition
看起来像
public class DeploymentConsumerDefinition : ConsumerDefinition<DeploymentConsumer>
{
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<DeploymentConsumer> consumerConfigurator)
{
consumerConfigurator.Options<JobOptions<DeploymentConsumer>>(options =>
{
options.SetJobTimeout(TimeSpan.FromMinutes(20));
options.SetConcurrentJobLimit(10);
options.SetRetry(r =>
{
r.Ignore<InvalidOperationException>();
r.Interval(5, TimeSpan.FromSeconds(10));
});
});
}
}
您的定义应指定作业使用者消息类型,而不是作业使用者类型:
public class DeploymentConsumerDefinition : ConsumerDefinition<DeploymentConsumer>
{
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<DeploymentConsumer> consumerConfigurator)
{
// MESSAGE TYPE NOT CONSUMER TYPE
consumerConfigurator.Options<JobOptions<DeploymentCommand>>(options =>
{
options.SetJobTimeout(TimeSpan.FromMinutes(20));
options.SetConcurrentJobLimit(10);
options.SetRetry(r =>
{
r.Ignore<InvalidOperationException>();
r.Interval(5, TimeSpan.FromSeconds(10));
});
});
}
}