Spring 云数据流:异步 DeploymentPartitionHanlder
Spring Cloud Data Flow : Asynchronous DeploymentPartitionHanlder
TL;DR
我使用 this example 构建了一个简单的应用程序,该应用程序使用 Spring 批处理(远程分区)和 Spring 云数据流在 Kubernetes 上部署工人 pods。
查看在 Kubernetes 上创建的“partitionedJob”pod 的日志,我看到工作步骤 (pods) 正在按顺序启动。启动一个 worker pod 所需的时间大约为 10-15 秒(有时长达 2 分钟,如下所示)。结果,工作人员 pods 以 10-15 秒的间隔一个接一个地启动。
日志:
[info 2021/06/26 14:30:29.089 UTC <main> tid=0x1] Job: [SimpleJob: [name=job]] launched with the following parameters: [{maxWorkers=40, chunkSize=5000, run.id=13, batch.worker-app=docker://docker-myhost.artifactrepository.net/my-project/myjob:0.1, grideSize=40}]
[info 2021/06/26 14:30:29.155 UTC <main> tid=0x1] The job execution id 26 was run within the task execution 235
[info 2021/06/26 14:30:29.184 UTC <main> tid=0x1] Executing step: [masterStep]
2021-06-26 14:30:29 INFO AuditRecordPartitioner:51 - Creating partitions. [gridSize=40]
[info 2021/06/26 14:32:41.128 UTC <main> tid=0x1] Using Docker entry point style: exec
[info 2021/06/26 14:34:51.560 UTC <main> tid=0x1] Using Docker image: docker-myhost.artifactrepository.net/myproject/myjob:0.1
[info 2021/06/26 14:34:51.560 UTC <main> tid=0x1] Using Docker entry point style: exec
[info 2021/06/26 14:36:39.464 UTC <main> tid=0x1] Using Docker image: docker-myhost.artifactrepository.net/myproject/myjob:0.1
[info 2021/06/26 14:36:39.464 UTC <main> tid=0x1] Using Docker entry point style: exec
[info 2021/06/26 14:38:34.203 UTC <main> tid=0x1] Using Docker image: docker-myhost.artifactrepository.net/myproject/myjob:0.1
[info 2021/06/26 14:38:34.203 UTC <main> tid=0x1] Using Docker entry point style: exec
[info 2021/06/26 14:40:44.544 UTC <main> tid=0x1] Using Docker image: docker-myhost.artifactrepository.net/myproject/myjob:0.1
[info 2021/06/26 14:40:44.544 UTC <main> tid=0x1] Using Docker entry point style: exec
在 Kubernetes 上创建 40 pods 大约需要 7-8 分钟。 (有时这个数字高达 20 分钟)理想的情况是所有分区步骤(worker pods)一次性异步启动。
问题:我们如何配置Spring Cloud Data Flow /Spring Batch to launch worker pods (partitioned steps) asynchronously/parallelly 而不是按顺序?如果 SCDF 确实一次创建了 40 个分区,为什么实际上 master job 以非常慢的速度一个接一个地创建这些分区? (如日志中所示)。我不认为这是一个基础设施问题,因为我能够使用 Task DSL
快速启动任务
相关代码:
@EnableTask
@EnableBatchProcessing
@SpringBootApplication
public class BatchApplication {
public static void main(String[] args) {
SpringApplication.run(BatchApplication.class, args);
}
}
/**
*
* Main job controller
*
*
*/
@Profile("master")
@Configuration
public class MasterConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(MasterConfiguration.class);
@Autowired
private ApplicationArguments applicationArguments;
@Bean
public Job job(JobBuilderFactory jobBuilderFactory) {
LOGGER.info("Creating job...");
SimpleJobBuilder jobBuilder = jobBuilderFactory.get("job").start(masterStep(null, null, null));
jobBuilder.incrementer(new RunIdIncrementer());
return jobBuilder.build();
}
@Bean
public Step masterStep(StepBuilderFactory stepBuilderFactory, Partitioner partitioner,
PartitionHandler partitionHandler) {
LOGGER.info("Creating masterStep");
return stepBuilderFactory.get("masterStep").partitioner("workerStep", partitioner)
.partitionHandler(partitionHandler).build();
}
@Bean
public DeployerPartitionHandler partitionHandler(@Value("${spring.profiles.active}") String activeProfile,
@Value("${batch.worker-app}") String resourceLocation,
@Value("${spring.application.name}") String applicationName, ApplicationContext context,
TaskLauncher taskLauncher, JobExplorer jobExplorer, ResourceLoaderResolver resolver) {
ResourceLoader resourceLoader = resolver.get(resourceLocation);
Resource resource = resourceLoader.getResource(resourceLocation);
DeployerPartitionHandler partitionHandler = new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
"workerStep");
List<String> commandLineArgs = new ArrayList<>();
commandLineArgs.add("--spring.profiles.active=" + activeProfile.replace("master", "worker"));
commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
commandLineArgs.add("--spring.batch.initializer.enabled=false");
commandLineArgs.addAll(Arrays.stream(applicationArguments.getSourceArgs()).filter(
x -> !x.startsWith("--spring.profiles.active=") && !x.startsWith("--spring.cloud.task.executionid="))
.collect(Collectors.toList()));
commandLineArgs.addAll(applicationArguments.getNonOptionArgs());
partitionHandler.setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
List<String> nonOptionArgs = applicationArguments.getNonOptionArgs();
partitionHandler.setMaxWorkers(Integer.valueOf(getNonOptionArgValue(nonOptionArgs, 3)));
partitionHandler.setGridSize(Integer.valueOf(getNonOptionArgValue(nonOptionArgs, 4)));
partitionHandler.setApplicationName(applicationName);
return partitionHandler;
}
@Bean("auditRecordPartitioner")
public Partitioner auditRecordPartitioner() {
return new AuditRecordPartitioner<>());
}
private String getNonOptionArgValue(List<String> nonOptionArgs, int index) {
return nonOptionArgs.get(index).split("=")[1];
}
}
@Profile("worker")
@Configuration
public class WorkerConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkerConfiguration.class);
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
private ApplicationArguments applicationArguments;
@Bean
public DeployerStepExecutionHandler stepExecutionHandler(ApplicationContext context, JobExplorer jobExplorer,
JobRepository jobRepository) {
LOGGER.info("stepExecutionHandler...");
return new DeployerStepExecutionHandler(context, jobExplorer, jobRepository);
}
@Bean
public Step workerStep(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("workerStep").tasklet(workerTasklet(null)).build();
}
@Bean
@StepScope
public WorkerTasklet workerTasklet(@Value("#{stepExecutionContext['key']}") String key) {
return new WorkerTasklet(key);
}
}
注意 我将 gridSize 和 maxWorkers 作为输入参数传递给主步骤(启动任务时来自 SCDF UI)。
出于演示目的,示例将最大工人数设置为 2 here。所以对于你的 40 个分区,只有两个 worker 会并行启动,这让你觉得你的分区是按顺序处理的。
您需要更新示例(或使其可配置)并根据需要增加并发工作器的数量。
如Mahmoud Ben Hassine in the comments, the workers are launched sequentially所述:
private void launchWorkers(Set<StepExecution> candidates,
Set<StepExecution> executed) {
for (StepExecution execution : candidates) {
if (this.currentWorkers < this.maxWorkers || this.maxWorkers < 0) {
launchWorker(execution);
this.currentWorkers++;
executed.add(execution);
}
}
}
因为 Glen Renfro mentioned in the comments, an issue 已为此创建。如果有可用于异步启动工作程序的解决方案,将更新此答案。
TL;DR
我使用 this example 构建了一个简单的应用程序,该应用程序使用 Spring 批处理(远程分区)和 Spring 云数据流在 Kubernetes 上部署工人 pods。
查看在 Kubernetes 上创建的“partitionedJob”pod 的日志,我看到工作步骤 (pods) 正在按顺序启动。启动一个 worker pod 所需的时间大约为 10-15 秒(有时长达 2 分钟,如下所示)。结果,工作人员 pods 以 10-15 秒的间隔一个接一个地启动。
日志:
[info 2021/06/26 14:30:29.089 UTC <main> tid=0x1] Job: [SimpleJob: [name=job]] launched with the following parameters: [{maxWorkers=40, chunkSize=5000, run.id=13, batch.worker-app=docker://docker-myhost.artifactrepository.net/my-project/myjob:0.1, grideSize=40}]
[info 2021/06/26 14:30:29.155 UTC <main> tid=0x1] The job execution id 26 was run within the task execution 235
[info 2021/06/26 14:30:29.184 UTC <main> tid=0x1] Executing step: [masterStep]
2021-06-26 14:30:29 INFO AuditRecordPartitioner:51 - Creating partitions. [gridSize=40]
[info 2021/06/26 14:32:41.128 UTC <main> tid=0x1] Using Docker entry point style: exec
[info 2021/06/26 14:34:51.560 UTC <main> tid=0x1] Using Docker image: docker-myhost.artifactrepository.net/myproject/myjob:0.1
[info 2021/06/26 14:34:51.560 UTC <main> tid=0x1] Using Docker entry point style: exec
[info 2021/06/26 14:36:39.464 UTC <main> tid=0x1] Using Docker image: docker-myhost.artifactrepository.net/myproject/myjob:0.1
[info 2021/06/26 14:36:39.464 UTC <main> tid=0x1] Using Docker entry point style: exec
[info 2021/06/26 14:38:34.203 UTC <main> tid=0x1] Using Docker image: docker-myhost.artifactrepository.net/myproject/myjob:0.1
[info 2021/06/26 14:38:34.203 UTC <main> tid=0x1] Using Docker entry point style: exec
[info 2021/06/26 14:40:44.544 UTC <main> tid=0x1] Using Docker image: docker-myhost.artifactrepository.net/myproject/myjob:0.1
[info 2021/06/26 14:40:44.544 UTC <main> tid=0x1] Using Docker entry point style: exec
在 Kubernetes 上创建 40 pods 大约需要 7-8 分钟。 (有时这个数字高达 20 分钟)理想的情况是所有分区步骤(worker pods)一次性异步启动。
问题:我们如何配置Spring Cloud Data Flow /Spring Batch to launch worker pods (partitioned steps) asynchronously/parallelly 而不是按顺序?如果 SCDF 确实一次创建了 40 个分区,为什么实际上 master job 以非常慢的速度一个接一个地创建这些分区? (如日志中所示)。我不认为这是一个基础设施问题,因为我能够使用 Task DSL
快速启动任务相关代码:
@EnableTask
@EnableBatchProcessing
@SpringBootApplication
public class BatchApplication {
public static void main(String[] args) {
SpringApplication.run(BatchApplication.class, args);
}
}
/**
*
* Main job controller
*
*
*/
@Profile("master")
@Configuration
public class MasterConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(MasterConfiguration.class);
@Autowired
private ApplicationArguments applicationArguments;
@Bean
public Job job(JobBuilderFactory jobBuilderFactory) {
LOGGER.info("Creating job...");
SimpleJobBuilder jobBuilder = jobBuilderFactory.get("job").start(masterStep(null, null, null));
jobBuilder.incrementer(new RunIdIncrementer());
return jobBuilder.build();
}
@Bean
public Step masterStep(StepBuilderFactory stepBuilderFactory, Partitioner partitioner,
PartitionHandler partitionHandler) {
LOGGER.info("Creating masterStep");
return stepBuilderFactory.get("masterStep").partitioner("workerStep", partitioner)
.partitionHandler(partitionHandler).build();
}
@Bean
public DeployerPartitionHandler partitionHandler(@Value("${spring.profiles.active}") String activeProfile,
@Value("${batch.worker-app}") String resourceLocation,
@Value("${spring.application.name}") String applicationName, ApplicationContext context,
TaskLauncher taskLauncher, JobExplorer jobExplorer, ResourceLoaderResolver resolver) {
ResourceLoader resourceLoader = resolver.get(resourceLocation);
Resource resource = resourceLoader.getResource(resourceLocation);
DeployerPartitionHandler partitionHandler = new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
"workerStep");
List<String> commandLineArgs = new ArrayList<>();
commandLineArgs.add("--spring.profiles.active=" + activeProfile.replace("master", "worker"));
commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
commandLineArgs.add("--spring.batch.initializer.enabled=false");
commandLineArgs.addAll(Arrays.stream(applicationArguments.getSourceArgs()).filter(
x -> !x.startsWith("--spring.profiles.active=") && !x.startsWith("--spring.cloud.task.executionid="))
.collect(Collectors.toList()));
commandLineArgs.addAll(applicationArguments.getNonOptionArgs());
partitionHandler.setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
List<String> nonOptionArgs = applicationArguments.getNonOptionArgs();
partitionHandler.setMaxWorkers(Integer.valueOf(getNonOptionArgValue(nonOptionArgs, 3)));
partitionHandler.setGridSize(Integer.valueOf(getNonOptionArgValue(nonOptionArgs, 4)));
partitionHandler.setApplicationName(applicationName);
return partitionHandler;
}
@Bean("auditRecordPartitioner")
public Partitioner auditRecordPartitioner() {
return new AuditRecordPartitioner<>());
}
private String getNonOptionArgValue(List<String> nonOptionArgs, int index) {
return nonOptionArgs.get(index).split("=")[1];
}
}
@Profile("worker")
@Configuration
public class WorkerConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkerConfiguration.class);
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
private ApplicationArguments applicationArguments;
@Bean
public DeployerStepExecutionHandler stepExecutionHandler(ApplicationContext context, JobExplorer jobExplorer,
JobRepository jobRepository) {
LOGGER.info("stepExecutionHandler...");
return new DeployerStepExecutionHandler(context, jobExplorer, jobRepository);
}
@Bean
public Step workerStep(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("workerStep").tasklet(workerTasklet(null)).build();
}
@Bean
@StepScope
public WorkerTasklet workerTasklet(@Value("#{stepExecutionContext['key']}") String key) {
return new WorkerTasklet(key);
}
}
注意 我将 gridSize 和 maxWorkers 作为输入参数传递给主步骤(启动任务时来自 SCDF UI)。
出于演示目的,示例将最大工人数设置为 2 here。所以对于你的 40 个分区,只有两个 worker 会并行启动,这让你觉得你的分区是按顺序处理的。
您需要更新示例(或使其可配置)并根据需要增加并发工作器的数量。
如Mahmoud Ben Hassine in the comments, the workers are launched sequentially所述:
private void launchWorkers(Set<StepExecution> candidates,
Set<StepExecution> executed) {
for (StepExecution execution : candidates) {
if (this.currentWorkers < this.maxWorkers || this.maxWorkers < 0) {
launchWorker(execution);
this.currentWorkers++;
executed.add(execution);
}
}
}
因为 Glen Renfro mentioned in the comments, an issue 已为此创建。如果有可用于异步启动工作程序的解决方案,将更新此答案。