Spring 多步骤批处理 Spring 用于远程分区的云任务 (PartitionHandler)
Spring Batch with multi - step Spring Cloud Task (PartitionHandler) for Remote Partition
最新更新(带有希望简化问题的图像)(感谢@Mahmoud 的反馈)
相关问题报告供其他参考(在这个原始 post 创建后,似乎有人为 Spring Cloud 提交了类似问题的问题,所以也在那里更新):
https://github.com/spring-cloud/spring-cloud-task/issues/793 与方法 #1
相关
https://github.com/spring-cloud/spring-cloud-task/issues/792 与方法 #2
相关
同时找到该问题的变通解决方案并更新该 git 集线器问题,一旦开发人员确认一切正常,将更新此问题
https://github.com/spring-cloud/spring-cloud-task/issues/793#issuecomment-894617929
我正在使用 spring 批处理作业开发涉及多步骤的应用程序,但遇到了一些障碍。确实尝试研究文档和不同的尝试,但没有成功。所以想检查社区是否可以阐明
Spring 批处理作业 1(接收到步骤 1 设置/步骤 2 设置的作业参数)
Step 1 -> remote partition (partitionhandler (cpu/memory for step 1 + grid) + partitioner) with setting from step1 (job configuration or step configuration)
Step 2 -> remote partition (partitionhandler (cpu/memory for step 2 + grid) + partitioner) with setting from step2 (job configuration or step configuration, and diff from step 1)
我们想要的原因是不同的 k8s 设置有不同的步骤(比如 cpu/memory/grid)
尝试次数:
- 创建两个分区处理程序(partitionHandlerReader + partitionHandlerProcessor)及其对应的启动器(LauncherReader + LauncherProcessor)
配置的主要class是尽量简化为一个class
https://github.com/danilko/spring-batch-remote-k8s-paritition-example/blob/attempt_1_two_partitionhandlers/src/main/java/com/example/batchprocessing/BatchConfiguration.java
- 使用一个 PartitionerHandler + 一个 TaskLauncher,但使用 @StepScope 用于基于步骤和作业设置的动态更改的后期绑定
配置的主要class是尽量简化为一个class
https://github.com/danilko/spring-batch-remote-k8s-paritition-example/blob/attempt_2_partitionhandler_with_stepscope/src/main/java/com/example/batchprocessing/BatchConfiguration.java
两个结果如下(上面 git 回购的完整跟踪):
作业触发时会报错(貌似初始启动成功,执行时报错)
因为以下只会在有多个 PartitionHandler
或当 Bean 位于 @StepScope
或 @JobScope
时才会出现
java.lang.NullPointerException: null
at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorker(DeployerPartitionHandler.java:347) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorkers(DeployerPartitionHandler.java:313) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.handle(DeployerPartitionHandler.java:302) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
完整日志
. ____ _ __ _ _
/\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.4.6)
2021-08-06 11:24:29.242 INFO 90294 --- [ main] c.e.b.BatchProcessingApplication : Starting BatchProcessingApplication v0.0.1-SNAPSHOT using Java 11.0.7 on localhost.localdomain with PID 90294 (/home/danilko/IdeaProjects/partition/target/batchprocessing-0.0.1-SNAPSHOT.jar started by danilko in /home/danilko/IdeaProjects/partition)
2021-08-06 11:24:29.244 INFO 90294 --- [ main] c.e.b.BatchProcessingApplication : The following profiles are active: controller
2021-08-06 11:24:29.790 INFO 90294 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-08-06 11:24:29.794 INFO 90294 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-08-06 11:24:29.797 INFO 90294 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-08-06 11:24:29.833 INFO 90294 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-06 11:24:29.947 INFO 90294 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-06 11:24:29.947 INFO 90294 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-06 11:24:29.959 INFO 90294 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.cloud.task.batch.configuration.TaskBatchAutoConfiguration' of type [org.springframework.cloud.task.batch.configuration.TaskBatchAutoConfiguration$$EnhancerBySpringCGLIB$e6c2be] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-06 11:24:29.968 INFO 90294 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.cloud.task.batch.listener.BatchEventAutoConfiguration' of type [org.springframework.cloud.task.batch.listener.BatchEventAutoConfiguration$$EnhancerBySpringCGLIB$$cc3cccc1] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-06 11:24:30.093 INFO 90294 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2021-08-06 11:24:30.160 INFO 90294 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
2021-08-06 11:24:30.724 INFO 90294 --- [ main] o.s.b.c.r.s.JobRepositoryFactoryBean : No database type set, using meta data indicating: MYSQL
2021-08-06 11:24:30.736 INFO 90294 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor.
2021-08-06 11:24:30.897 INFO 90294 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-08-06 11:24:30.897 INFO 90294 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2021-08-06 11:24:30.897 INFO 90294 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2021-08-06 11:24:30.974 INFO 90294 --- [ main] c.e.b.BatchProcessingApplication : Started BatchProcessingApplication in 2.024 seconds (JVM running for 2.366)
2021-08-06 11:24:30.975 INFO 90294 --- [ main] o.s.b.a.b.JobLauncherApplicationRunner : Running default command line with: []
2021-08-06 11:24:31.010 INFO 90294 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=partitionedJob-1538890488]] launched with the following parameters: [{}]
Set readerGridSize == 1
2021-08-06 11:24:31.020 INFO 90294 --- [ main] o.s.c.t.b.l.TaskBatchExecutionListener : The job execution id 22 was run within the task execution 54
2021-08-06 11:24:31.046 INFO 90294 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [partitionReaderStep]
2021-08-06 11:24:31.101 ERROR 90294 --- [ main] o.s.batch.core.step.AbstractStep : Encountered an error executing step partitionReaderStep in job partitionedJob-1538890488
java.lang.NullPointerException: null
at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorker(DeployerPartitionHandler.java:347) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorkers(DeployerPartitionHandler.java:313) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.handle(DeployerPartitionHandler.java:302) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.7.jar!/:5.3.7]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.7.jar!/:5.3.7]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.7.jar!/:5.3.7]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:137) ~[spring-aop-5.3.7.jar!/:5.3.7]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124) ~[spring-aop-5.3.7.jar!/:5.3.7]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.7.jar!/:5.3.7]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.7.jar!/:5.3.7]
at com.sun.proxy.$Proxy65.handle(Unknown Source) ~[na:na]
at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:106) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:152) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:413) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:320) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:149) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.3.7.jar!/:5.3.7]
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.7.jar!/:5.3.7]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.7.jar!/:5.3.7]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.7.jar!/:5.3.7]
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:128) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.7.jar!/:5.3.7]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.7.jar!/:5.3.7]
at com.sun.proxy.$Proxy51.run(Unknown Source) ~[na:na]
at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.execute(JobLauncherApplicationRunner.java:199) ~[spring-boot-autoconfigure-2.4.6.jar!/:2.4.6]
at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.executeLocalJobs(JobLauncherApplicationRunner.java:173) ~[spring-boot-autoconfigure-2.4.6.jar!/:2.4.6]
at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.launchJobFromProperties(JobLauncherApplicationRunner.java:160) ~[spring-boot-autoconfigure-2.4.6.jar!/:2.4.6]
at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.run(JobLauncherApplicationRunner.java:155) ~[spring-boot-autoconfigure-2.4.6.jar!/:2.4.6]
at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.run(JobLauncherApplicationRunner.java:150) ~[spring-boot-autoconfigure-2.4.6.jar!/:2.4.6]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:799) ~[spring-boot-2.4.6.jar!/:2.4.6]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:789) ~[spring-boot-2.4.6.jar!/:2.4.6]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:346) ~[spring-boot-2.4.6.jar!/:2.4.6]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1329) ~[spring-boot-2.4.6.jar!/:2.4.6]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1318) ~[spring-boot-2.4.6.jar!/:2.4.6]
at com.example.batchprocessing.BatchProcessingApplication.main(BatchProcessingApplication.java:10) ~[classes!/:0.0.1-SNAPSHOT]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49) ~[batchprocessing-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
at org.springframework.boot.loader.Launcher.launch(Launcher.java:108) ~[batchprocessing-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
at org.springframework.boot.loader.Launcher.launch(Launcher.java:58) ~[batchprocessing-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88) ~[batchprocessing-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
Study/Reference:
我在网上找到的大多数教程只涉及一个分区步骤。
https://dataflow.spring.io/docs/feature-guides/batch/partitioning/
提前感谢info/helps
- Is above even possible setup?
是的,没有什么能阻止您在单个 Spring 批处理作业中进行两个分区步骤。
- Is it possible to use JobScope/StepScope to pass info to the partitionhandler
是的,如果分区处理程序需要配置后期绑定功能,则可以将其声明为 job/step 作用域 bean。
@DanilKo 于 2021 年 8 月 14 日更新
原答案高阶正确。然而,要真正实现分区处理程序的步骤范围,需要修改代码
下面是分析+我的建议workaround/fix(也许最终代码维护者会有更好的方法让它工作,但到目前为止下面的修复对我有用)
问题正在继续讨论:
https://github.com/spring-cloud/spring-cloud-task/issues/793(多分区处理程序讨论)
https://github.com/spring-cloud/spring-cloud-task/issues/792
(此修复基于在步骤范围内使用 partitionerhandler 来配置不同的工作步骤 + 资源 + 最大工作人员)
根本原因分析(假设)
问题是 DeployerPartitionHandler 利用注解 @BeforeTask 强制任务传入 TaskExecution 对象作为任务设置的一部分
但是由于这个 partionerHandler 现在在@StepScope(而不是直接在@Bean level with @Enable Task)或者有两个 partitionHandler,该设置不再被触发,因为 @EnableTask
似乎无法在创建过程中找到一个 partitionhandler
。
创建的 DeployerHandler 在尝试启动时面临空 taskExecution
(因为它从未设置)
解决方法
下面本质上是一种使用当前作业执行 ID 检索关联任务执行 ID 的变通方法
从那里获取任务执行并传递给部署处理程序以满足其对 taskExecution 引用的需求
好像有效果,但还不清楚有没有其他的副作用(目前在测试中还没有发现)
中找到
在partitionHandler方法中
@Bean
@StepScope
public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
JobExplorer jobExplorer,
@Value("#{stepExecution}") StepExecution stepExecution) throws Exception {
...
// After the declaration of partitionhandler
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerStep")
, taskRepository);
// Issue https://github.com/spring-cloud/spring-cloud-task/issues/793
// Perform the setting of execution as this partitioner now not created at task level so @beforetask is no longer vaild
// The problem is DeployerPartitionHandler utilize annoation @BeforeTask to force task to pass in TaskExecution object as part of Task setup
// But as this partionerHandler is now at @StepScope (instead of directly at @Bean level with @Enable Task), that setup is no longer triggered
// Resulted created DeployerHandler faced a null
// Below is essentially a workaround to use the current job execution id to retrieve the associated task execution id
// From there, got that task execution and passed to deploy handler to fulfill its need of taskExecution reference
// It seem to work, but still not clear if there is other side effect (so far during test not found any)
long executionId = taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId());
System.out.println("Current execution job to task execution id " + executionId);
TaskExecution taskExecution = taskExplorer.getTaskExecution(taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId()));
System.out.println("Current execution job to task execution is not null: " + (taskExecution != null));
partitionHandler.beforeTask(taskExecution);
...
// rest of code continue
(注意它利用 stepExecution
上下文找出当前触发器步骤名称,因此分配不同的工作步骤)
在这种情况下,工作人员名称来自预定义的作业执行,但也可能来自作业参数或其他地方)
该作业上下文中填充了作业列表器
作业配置了作业侦听器
@Bean(name = "partitionedJob")
@Profile("!worker")
public Job partitionedJob()throws Exception {
Random random = new Random();
return jobBuilderFactory.get("partitionedJob" + random.nextInt())
.start(partitionReaderStep())
.listener(jobExecutionListener())
.next(partitionProcessorStep())
.build();
}
在工作侦听器中填充它
@Bean
public JobExecutionListener jobExecutionListener() {
JobExecutionListener listener = new JobExecutionListener(){
@Override
public void beforeJob(JobExecution jobExecution)
{
jobExecution.getExecutionContext().putString("readerCPURequest", "1");
jobExecution.getExecutionContext().putString("readerCPULimit", "2");
jobExecution.getExecutionContext().putString("readerWorkerGridSize", "1");
// For now using same image for reader/processor, but if it work, can split them
jobExecution.getExecutionContext().putString("readerWorkerImage", "worker:latest");
jobExecution.getExecutionContext().putString("readerWorkerStep", "workerStepReader");
jobExecution.getExecutionContext().putString("processorCPURequest", "3");
jobExecution.getExecutionContext().putString("processorCPULimit", "4");
jobExecution.getExecutionContext().putString("processorWorkerGridSize", "2");
// For now using same image for reader/processor, but if it work, will split them
jobExecution.getExecutionContext().putString("processorWorkerImage", "worker:latest");
jobExecution.getExecutionContext().putString("processorWorkerStep", "workerStepProcessor");
System.out.println("Set readerGridSize == " + jobExecution.getExecutionContext().getString("readerGridSize", "IT IS NULL WHICH IS INCORRECT"));
}
@Override
public void afterJob(JobExecution jobExecution) {
}
};
return listener;
}
完整代码(也可以在我的代码 github 中找到,在应用解决方法修复后):https://github.com/danilko/spring-batch-remote-k8s-paritition-example/blob/main/src/main/java/com/example/batchprocessing/BatchConfiguration.java
package com.example.batchprocessing;
import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.batch.JobList;
import io.fabric8.kubernetes.api.model.batch.JobSpec;
import io.fabric8.kubernetes.api.model.batch.JobStatus;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.deployer.resource.docker.DockerResource;
import org.springframework.cloud.deployer.resource.support.DelegatingResourceLoader;
import org.springframework.cloud.deployer.spi.kubernetes.*;
import org.springframework.cloud.deployer.spi.task.TaskLauncher;
import org.springframework.cloud.task.batch.partition.*;
import org.springframework.cloud.task.configuration.EnableTask;
import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.cloud.task.repository.TaskExplorer;
import org.springframework.cloud.task.repository.TaskRepository;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;
import org.springframework.core.env.SystemEnvironmentPropertySource;
import org.springframework.core.io.Resource;
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.util.StringUtils;
import java.util.*;
@Configuration
@EnableBatchProcessing
@EnableTask
public class BatchConfiguration {
private static int BACK_OFF_LIMIT = 6;
// Set the kuberentes job name
private String taskName_prefix="partitionedbatchjob";
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
public JobExplorer jobExplorer;
@Autowired
public JobRepository jobRepository;
@Autowired
public TaskExecutor taskExecutor;
@Autowired
public TaskRepository taskRepository;
@Autowired
public TaskExplorer taskExplorer;
@Autowired
private ConfigurableApplicationContext context;
@Autowired
private DelegatingResourceLoader resourceLoader;
@Autowired
private Environment environment;
@Bean
@StepScope
public Partitioner partitioner( @Value("#{stepExecution}") StepExecution stepExecution) {
return new Partitioner() {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> partitions = new HashMap<>(gridSize);
int targetGridSize = 0;
String step = "";
if(stepExecution.getStepName().equalsIgnoreCase("partitionReaderStep"))
{
step = "reader";
}
else
{
step = "processor";
}
targetGridSize = Integer.parseInt(stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerGridSize"));
for (int i = 0; i < targetGridSize; i++) {
ExecutionContext context1 = new ExecutionContext();
context1.put("partitionNumber", i);
partitions.put("partition" + i, context1);
}
return partitions;
}
};
}
@Bean
public KubernetesClient kuberentesClient()
{
KubernetesDeployerProperties kubernetesDeployerProperties = new KubernetesDeployerProperties();
return KubernetesClientFactory.getKubernetesClient(kubernetesDeployerProperties);
}
@Bean
@StepScope
public TaskLauncher taskLauncher( @Value("#{stepExecution}") StepExecution stepExecution)
{
KubernetesDeployerProperties kubernetesDeployerProperties = new KubernetesDeployerProperties();
kubernetesDeployerProperties.setNamespace("default");
kubernetesDeployerProperties.setCreateJob(true);
// Database setup to reference configmap for database info
List<KubernetesDeployerProperties.ConfigMapKeyRef> configMapKeyRefList = new ArrayList<KubernetesDeployerProperties.ConfigMapKeyRef>();
KubernetesDeployerProperties.ConfigMapKeyRef configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
configMapKeyRef.setConfigMapName("mariadb");
configMapKeyRef.setDataKey("SPRING_DATASOURCE_URL");
configMapKeyRef.setEnvVarName("SPRING_DATASOURCE_URL");
configMapKeyRefList.add(configMapKeyRef);
configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
configMapKeyRef.setConfigMapName("mariadb");
configMapKeyRef.setDataKey("SPRING_DATASOURCE_USERNAME");
configMapKeyRef.setEnvVarName("SPRING_DATASOURCE_USERNAME");
configMapKeyRefList.add(configMapKeyRef);
configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
configMapKeyRef.setConfigMapName("mariadb");
configMapKeyRef.setDataKey("SPRING_DATASOURCE_PASSWORD");
configMapKeyRef.setEnvVarName("SPRING_DATASOURCE_PASSWORD");
configMapKeyRefList.add(configMapKeyRef);
configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
configMapKeyRef.setConfigMapName("mariadb");
configMapKeyRef.setDataKey("SPRING_DATASOURCE_DRIVERCLASSNAME");
configMapKeyRef.setEnvVarName("SPRING_DATASOURCE_DRIVERCLASSNAME");
configMapKeyRefList.add(configMapKeyRef);
configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
configMapKeyRef.setConfigMapName("mariadb");
configMapKeyRef.setDataKey("SPRING_PROFILES_ACTIVE");
configMapKeyRef.setEnvVarName("SPRING_PROFILES_ACTIVE");
configMapKeyRefList.add(configMapKeyRef);
kubernetesDeployerProperties.setConfigMapKeyRefs(configMapKeyRefList);
// Set request resource
KubernetesDeployerProperties.RequestsResources request = new KubernetesDeployerProperties.RequestsResources();
KubernetesDeployerProperties.LimitsResources limit = new KubernetesDeployerProperties.LimitsResources();
String step = "";
if(stepExecution.getStepName().equalsIgnoreCase("partitionReaderStep"))
{
step="reader";
}
else
{
step="processor";
}
request.setCpu(stepExecution.getJobExecution().getExecutionContext().getString(step + "CPURequest"));
request.setMemory("2000Mi");
limit.setCpu(stepExecution.getJobExecution().getExecutionContext().getString(step +"CPULimit"));
limit.setMemory("3000Mi");
kubernetesDeployerProperties.setRequests(request);
kubernetesDeployerProperties.setLimits(limit);
// as build on local image, so need to use local
kubernetesDeployerProperties.setImagePullPolicy(ImagePullPolicy.IfNotPresent);
// Set task launcher properties to not repeat and not restart
KubernetesTaskLauncherProperties kubernetesTaskLauncherProperties = new KubernetesTaskLauncherProperties();
// https://kubernetes.io/docs/concepts/workloads/controllers/job/
// Set to never to create new pod on restart
kubernetesTaskLauncherProperties.setBackoffLimit(BACK_OFF_LIMIT);
kubernetesTaskLauncherProperties.setRestartPolicy(RestartPolicy.Never);
KubernetesTaskLauncher kubernetesTaskLauncher = new KubernetesTaskLauncher(kubernetesDeployerProperties,
kubernetesTaskLauncherProperties, kuberentesClient());
return kubernetesTaskLauncher;
}
@Bean(name = "partitionedJob")
@Profile("!worker")
public Job partitionedJob()throws Exception {
Random random = new Random();
return jobBuilderFactory.get("partitionedJob" + random.nextInt())
.start(partitionReaderStep())
.listener(jobExecutionListener())
.next(partitionProcessorStep())
.build();
}
@Bean(name = "partitionReaderStep")
public Step partitionReaderStep() throws Exception {
return stepBuilderFactory.get("partitionReaderStep")
.partitioner(workerStepReader().getName(), partitioner( null))
.step(workerStepReader())
.partitionHandler(partitionHandler(
taskLauncher( null),
jobExplorer, null))
.build();
}
@Bean(name = "partitionProcessorStep")
public Step partitionProcessorStep() throws Exception {
return stepBuilderFactory.get("partitionProcessorStep")
.partitioner(workerStepProcessor().getName(), partitioner( null))
.step(workerStepProcessor())
.partitionHandler(partitionHandler(
taskLauncher( null),
jobExplorer, null))
.build();
}
@Bean
@StepScope
public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
JobExplorer jobExplorer,
@Value("#{stepExecution}") StepExecution stepExecution) throws Exception {
String step ="processor";
if(stepExecution.getStepName().equalsIgnoreCase("partitionReaderStep")) {
step = "reader";
}
// Use local build image
DockerResource resource = new DockerResource(stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerImage"));
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerStep")
, taskRepository);
// Issue https://github.com/spring-cloud/spring-cloud-task/issues/793
// Perform the setting of execution as this partitioner now not created at task level so @beforetask is no longer vaild
// The problem is DeployerPartitionHandler utilize annoation @BeforeTask to force task to pass in TaskExecution object as part of Task setup
// But as this partionerHandler is now at @StepScope (instead of directly at @Bean level with @Enable Task), that setup is no longer triggered
// Resulted created DeployerHandler faced a null
// Below is essentially a workaround to use the current job execution id to retrieve the associated task execution id
// From there, got that task execution and passed to deploy handler to fulfill its need of taskExecution reference
// It seem to work, but still not clear if there is other side effect (so far during test not found any)
long executionId = taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId());
System.out.println("Current execution job to task execution id " + executionId);
TaskExecution taskExecution = taskExplorer.getTaskExecution(taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId()));
System.out.println("Current execution job to task execution is not null: " + (taskExecution != null));
partitionHandler.beforeTask(taskExecution);
List<String> commandLineArgs = new ArrayList<>(3);
commandLineArgs.add("--spring.profiles.active=worker");
commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
commandLineArgs.add("--spring.batch.initializer.enabled=false");
partitionHandler
.setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
partitionHandler.setMaxWorkers(Integer.parseInt(stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerGridSize")));
partitionHandler.setApplicationName(taskName_prefix + step);
return partitionHandler;
}
@Bean
public JobExecutionListener jobExecutionListener() {
JobExecutionListener listener = new JobExecutionListener(){
@Override
public void beforeJob(JobExecution jobExecution)
{
jobExecution.getExecutionContext().putString("readerCPURequest", "1");
jobExecution.getExecutionContext().putString("readerCPULimit", "2");
jobExecution.getExecutionContext().putString("readerWorkerGridSize", "1");
// For now using same image for reader/processor, but if it work, can split them
jobExecution.getExecutionContext().putString("readerWorkerImage", "worker:latest");
jobExecution.getExecutionContext().putString("readerWorkerStep", "workerStepReader");
jobExecution.getExecutionContext().putString("processorCPURequest", "3");
jobExecution.getExecutionContext().putString("processorCPULimit", "4");
jobExecution.getExecutionContext().putString("processorWorkerGridSize", "2");
// For now using same image for reader/processor, but if it work, will split them
jobExecution.getExecutionContext().putString("processorWorkerImage", "worker:latest");
jobExecution.getExecutionContext().putString("processorWorkerStep", "workerStepProcessor");
System.out.println("Set readerGridSize == " + jobExecution.getExecutionContext().getString("readerGridSize", "IT IS NULL WHICH IS INCORRECT"));
}
@Override
public void afterJob(JobExecution jobExecution) {
}
};
return listener;
}
@Bean
@Profile("worker")
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
return new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
}
@Bean(name = "workerStepReader")
public Step workerStepReader() {
return this.stepBuilderFactory.get("workerStepReader")
.tasklet(workerTaskletReader(null))
.build();
}
@Bean(name = "workerStepProcessor")
public Step workerStepProcessor() {
return this.stepBuilderFactory.get("workerStepProcessor")
.tasklet(workerTaskletProcessor(null))
.build();
}
@Bean
@StepScope
public Tasklet workerTaskletReader(
final @Value("#{stepExecution}") StepExecution stepExecution) {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
Integer partitionNumber = stepExecution.getExecutionContext().getInt("partitionNumber");
System.out.println("This workerTaskletReader ran partition: " + partitionNumber);
return RepeatStatus.FINISHED;
}
};
}
@Bean
@StepScope
public Tasklet workerTaskletProcessor(
final @Value("#{stepExecution}") StepExecution stepExecution) {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
Integer partitionNumber = stepExecution.getExecutionContext().getInt("partitionNumber");
System.out.println("This workerTaskletProcessor ran partition: " + partitionNumber);
return RepeatStatus.FINISHED;
}
};
}
}
最新更新(带有希望简化问题的图像)(感谢@Mahmoud 的反馈)
相关问题报告供其他参考(在这个原始 post 创建后,似乎有人为 Spring Cloud 提交了类似问题的问题,所以也在那里更新):
https://github.com/spring-cloud/spring-cloud-task/issues/793 与方法 #1
相关https://github.com/spring-cloud/spring-cloud-task/issues/792 与方法 #2
相关同时找到该问题的变通解决方案并更新该 git 集线器问题,一旦开发人员确认一切正常,将更新此问题 https://github.com/spring-cloud/spring-cloud-task/issues/793#issuecomment-894617929
我正在使用 spring 批处理作业开发涉及多步骤的应用程序,但遇到了一些障碍。确实尝试研究文档和不同的尝试,但没有成功。所以想检查社区是否可以阐明
Spring 批处理作业 1(接收到步骤 1 设置/步骤 2 设置的作业参数)
Step 1 -> remote partition (partitionhandler (cpu/memory for step 1 + grid) + partitioner) with setting from step1 (job configuration or step configuration)
Step 2 -> remote partition (partitionhandler (cpu/memory for step 2 + grid) + partitioner) with setting from step2 (job configuration or step configuration, and diff from step 1)
我们想要的原因是不同的 k8s 设置有不同的步骤(比如 cpu/memory/grid)
尝试次数:
- 创建两个分区处理程序(partitionHandlerReader + partitionHandlerProcessor)及其对应的启动器(LauncherReader + LauncherProcessor)
配置的主要class是尽量简化为一个class https://github.com/danilko/spring-batch-remote-k8s-paritition-example/blob/attempt_1_two_partitionhandlers/src/main/java/com/example/batchprocessing/BatchConfiguration.java
- 使用一个 PartitionerHandler + 一个 TaskLauncher,但使用 @StepScope 用于基于步骤和作业设置的动态更改的后期绑定
配置的主要class是尽量简化为一个class https://github.com/danilko/spring-batch-remote-k8s-paritition-example/blob/attempt_2_partitionhandler_with_stepscope/src/main/java/com/example/batchprocessing/BatchConfiguration.java
两个结果如下(上面 git 回购的完整跟踪):
作业触发时会报错(貌似初始启动成功,执行时报错)
因为以下只会在有多个 PartitionHandler
或当 Bean 位于 @StepScope
或 @JobScope
java.lang.NullPointerException: null
at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorker(DeployerPartitionHandler.java:347) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorkers(DeployerPartitionHandler.java:313) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.handle(DeployerPartitionHandler.java:302) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
完整日志
. ____ _ __ _ _
/\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.4.6)
2021-08-06 11:24:29.242 INFO 90294 --- [ main] c.e.b.BatchProcessingApplication : Starting BatchProcessingApplication v0.0.1-SNAPSHOT using Java 11.0.7 on localhost.localdomain with PID 90294 (/home/danilko/IdeaProjects/partition/target/batchprocessing-0.0.1-SNAPSHOT.jar started by danilko in /home/danilko/IdeaProjects/partition)
2021-08-06 11:24:29.244 INFO 90294 --- [ main] c.e.b.BatchProcessingApplication : The following profiles are active: controller
2021-08-06 11:24:29.790 INFO 90294 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-08-06 11:24:29.794 INFO 90294 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-08-06 11:24:29.797 INFO 90294 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-08-06 11:24:29.833 INFO 90294 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-06 11:24:29.947 INFO 90294 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-06 11:24:29.947 INFO 90294 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-06 11:24:29.959 INFO 90294 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.cloud.task.batch.configuration.TaskBatchAutoConfiguration' of type [org.springframework.cloud.task.batch.configuration.TaskBatchAutoConfiguration$$EnhancerBySpringCGLIB$e6c2be] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-06 11:24:29.968 INFO 90294 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.cloud.task.batch.listener.BatchEventAutoConfiguration' of type [org.springframework.cloud.task.batch.listener.BatchEventAutoConfiguration$$EnhancerBySpringCGLIB$$cc3cccc1] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-06 11:24:30.093 INFO 90294 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2021-08-06 11:24:30.160 INFO 90294 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
2021-08-06 11:24:30.724 INFO 90294 --- [ main] o.s.b.c.r.s.JobRepositoryFactoryBean : No database type set, using meta data indicating: MYSQL
2021-08-06 11:24:30.736 INFO 90294 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor.
2021-08-06 11:24:30.897 INFO 90294 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-08-06 11:24:30.897 INFO 90294 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2021-08-06 11:24:30.897 INFO 90294 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2021-08-06 11:24:30.974 INFO 90294 --- [ main] c.e.b.BatchProcessingApplication : Started BatchProcessingApplication in 2.024 seconds (JVM running for 2.366)
2021-08-06 11:24:30.975 INFO 90294 --- [ main] o.s.b.a.b.JobLauncherApplicationRunner : Running default command line with: []
2021-08-06 11:24:31.010 INFO 90294 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=partitionedJob-1538890488]] launched with the following parameters: [{}]
Set readerGridSize == 1
2021-08-06 11:24:31.020 INFO 90294 --- [ main] o.s.c.t.b.l.TaskBatchExecutionListener : The job execution id 22 was run within the task execution 54
2021-08-06 11:24:31.046 INFO 90294 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [partitionReaderStep]
2021-08-06 11:24:31.101 ERROR 90294 --- [ main] o.s.batch.core.step.AbstractStep : Encountered an error executing step partitionReaderStep in job partitionedJob-1538890488
java.lang.NullPointerException: null
at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorker(DeployerPartitionHandler.java:347) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorkers(DeployerPartitionHandler.java:313) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.handle(DeployerPartitionHandler.java:302) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.7.jar!/:5.3.7]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.7.jar!/:5.3.7]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.7.jar!/:5.3.7]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:137) ~[spring-aop-5.3.7.jar!/:5.3.7]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124) ~[spring-aop-5.3.7.jar!/:5.3.7]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.7.jar!/:5.3.7]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.7.jar!/:5.3.7]
at com.sun.proxy.$Proxy65.handle(Unknown Source) ~[na:na]
at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:106) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:152) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:413) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:320) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:149) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.3.7.jar!/:5.3.7]
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.7.jar!/:5.3.7]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.7.jar!/:5.3.7]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.7.jar!/:5.3.7]
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:128) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.7.jar!/:5.3.7]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.7.jar!/:5.3.7]
at com.sun.proxy.$Proxy51.run(Unknown Source) ~[na:na]
at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.execute(JobLauncherApplicationRunner.java:199) ~[spring-boot-autoconfigure-2.4.6.jar!/:2.4.6]
at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.executeLocalJobs(JobLauncherApplicationRunner.java:173) ~[spring-boot-autoconfigure-2.4.6.jar!/:2.4.6]
at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.launchJobFromProperties(JobLauncherApplicationRunner.java:160) ~[spring-boot-autoconfigure-2.4.6.jar!/:2.4.6]
at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.run(JobLauncherApplicationRunner.java:155) ~[spring-boot-autoconfigure-2.4.6.jar!/:2.4.6]
at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.run(JobLauncherApplicationRunner.java:150) ~[spring-boot-autoconfigure-2.4.6.jar!/:2.4.6]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:799) ~[spring-boot-2.4.6.jar!/:2.4.6]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:789) ~[spring-boot-2.4.6.jar!/:2.4.6]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:346) ~[spring-boot-2.4.6.jar!/:2.4.6]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1329) ~[spring-boot-2.4.6.jar!/:2.4.6]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1318) ~[spring-boot-2.4.6.jar!/:2.4.6]
at com.example.batchprocessing.BatchProcessingApplication.main(BatchProcessingApplication.java:10) ~[classes!/:0.0.1-SNAPSHOT]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49) ~[batchprocessing-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
at org.springframework.boot.loader.Launcher.launch(Launcher.java:108) ~[batchprocessing-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
at org.springframework.boot.loader.Launcher.launch(Launcher.java:58) ~[batchprocessing-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88) ~[batchprocessing-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
Study/Reference: 我在网上找到的大多数教程只涉及一个分区步骤。 https://dataflow.spring.io/docs/feature-guides/batch/partitioning/
提前感谢info/helps
- Is above even possible setup?
是的,没有什么能阻止您在单个 Spring 批处理作业中进行两个分区步骤。
- Is it possible to use JobScope/StepScope to pass info to the partitionhandler
是的,如果分区处理程序需要配置后期绑定功能,则可以将其声明为 job/step 作用域 bean。
@DanilKo 于 2021 年 8 月 14 日更新
原答案高阶正确。然而,要真正实现分区处理程序的步骤范围,需要修改代码
下面是分析+我的建议workaround/fix(也许最终代码维护者会有更好的方法让它工作,但到目前为止下面的修复对我有用)
问题正在继续讨论: https://github.com/spring-cloud/spring-cloud-task/issues/793(多分区处理程序讨论) https://github.com/spring-cloud/spring-cloud-task/issues/792 (此修复基于在步骤范围内使用 partitionerhandler 来配置不同的工作步骤 + 资源 + 最大工作人员)
根本原因分析(假设)
问题是 DeployerPartitionHandler 利用注解 @BeforeTask 强制任务传入 TaskExecution 对象作为任务设置的一部分
但是由于这个 partionerHandler 现在在@StepScope(而不是直接在@Bean level with @Enable Task)或者有两个 partitionHandler,该设置不再被触发,因为 @EnableTask
似乎无法在创建过程中找到一个 partitionhandler
。
创建的 DeployerHandler 在尝试启动时面临空 taskExecution
(因为它从未设置)
解决方法
下面本质上是一种使用当前作业执行 ID 检索关联任务执行 ID 的变通方法 从那里获取任务执行并传递给部署处理程序以满足其对 taskExecution 引用的需求 好像有效果,但还不清楚有没有其他的副作用(目前在测试中还没有发现)
中找到在partitionHandler方法中
@Bean
@StepScope
public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
JobExplorer jobExplorer,
@Value("#{stepExecution}") StepExecution stepExecution) throws Exception {
...
// After the declaration of partitionhandler
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerStep")
, taskRepository);
// Issue https://github.com/spring-cloud/spring-cloud-task/issues/793
// Perform the setting of execution as this partitioner now not created at task level so @beforetask is no longer vaild
// The problem is DeployerPartitionHandler utilize annoation @BeforeTask to force task to pass in TaskExecution object as part of Task setup
// But as this partionerHandler is now at @StepScope (instead of directly at @Bean level with @Enable Task), that setup is no longer triggered
// Resulted created DeployerHandler faced a null
// Below is essentially a workaround to use the current job execution id to retrieve the associated task execution id
// From there, got that task execution and passed to deploy handler to fulfill its need of taskExecution reference
// It seem to work, but still not clear if there is other side effect (so far during test not found any)
long executionId = taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId());
System.out.println("Current execution job to task execution id " + executionId);
TaskExecution taskExecution = taskExplorer.getTaskExecution(taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId()));
System.out.println("Current execution job to task execution is not null: " + (taskExecution != null));
partitionHandler.beforeTask(taskExecution);
...
// rest of code continue
(注意它利用 stepExecution
上下文找出当前触发器步骤名称,因此分配不同的工作步骤)
在这种情况下,工作人员名称来自预定义的作业执行,但也可能来自作业参数或其他地方)
该作业上下文中填充了作业列表器
作业配置了作业侦听器
@Bean(name = "partitionedJob")
@Profile("!worker")
public Job partitionedJob()throws Exception {
Random random = new Random();
return jobBuilderFactory.get("partitionedJob" + random.nextInt())
.start(partitionReaderStep())
.listener(jobExecutionListener())
.next(partitionProcessorStep())
.build();
}
在工作侦听器中填充它
@Bean
public JobExecutionListener jobExecutionListener() {
JobExecutionListener listener = new JobExecutionListener(){
@Override
public void beforeJob(JobExecution jobExecution)
{
jobExecution.getExecutionContext().putString("readerCPURequest", "1");
jobExecution.getExecutionContext().putString("readerCPULimit", "2");
jobExecution.getExecutionContext().putString("readerWorkerGridSize", "1");
// For now using same image for reader/processor, but if it work, can split them
jobExecution.getExecutionContext().putString("readerWorkerImage", "worker:latest");
jobExecution.getExecutionContext().putString("readerWorkerStep", "workerStepReader");
jobExecution.getExecutionContext().putString("processorCPURequest", "3");
jobExecution.getExecutionContext().putString("processorCPULimit", "4");
jobExecution.getExecutionContext().putString("processorWorkerGridSize", "2");
// For now using same image for reader/processor, but if it work, will split them
jobExecution.getExecutionContext().putString("processorWorkerImage", "worker:latest");
jobExecution.getExecutionContext().putString("processorWorkerStep", "workerStepProcessor");
System.out.println("Set readerGridSize == " + jobExecution.getExecutionContext().getString("readerGridSize", "IT IS NULL WHICH IS INCORRECT"));
}
@Override
public void afterJob(JobExecution jobExecution) {
}
};
return listener;
}
完整代码(也可以在我的代码 github 中找到,在应用解决方法修复后):https://github.com/danilko/spring-batch-remote-k8s-paritition-example/blob/main/src/main/java/com/example/batchprocessing/BatchConfiguration.java
package com.example.batchprocessing;
import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.batch.JobList;
import io.fabric8.kubernetes.api.model.batch.JobSpec;
import io.fabric8.kubernetes.api.model.batch.JobStatus;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.deployer.resource.docker.DockerResource;
import org.springframework.cloud.deployer.resource.support.DelegatingResourceLoader;
import org.springframework.cloud.deployer.spi.kubernetes.*;
import org.springframework.cloud.deployer.spi.task.TaskLauncher;
import org.springframework.cloud.task.batch.partition.*;
import org.springframework.cloud.task.configuration.EnableTask;
import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.cloud.task.repository.TaskExplorer;
import org.springframework.cloud.task.repository.TaskRepository;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;
import org.springframework.core.env.SystemEnvironmentPropertySource;
import org.springframework.core.io.Resource;
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.util.StringUtils;
import java.util.*;
@Configuration
@EnableBatchProcessing
@EnableTask
public class BatchConfiguration {
private static int BACK_OFF_LIMIT = 6;
// Set the kuberentes job name
private String taskName_prefix="partitionedbatchjob";
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
public JobExplorer jobExplorer;
@Autowired
public JobRepository jobRepository;
@Autowired
public TaskExecutor taskExecutor;
@Autowired
public TaskRepository taskRepository;
@Autowired
public TaskExplorer taskExplorer;
@Autowired
private ConfigurableApplicationContext context;
@Autowired
private DelegatingResourceLoader resourceLoader;
@Autowired
private Environment environment;
@Bean
@StepScope
public Partitioner partitioner( @Value("#{stepExecution}") StepExecution stepExecution) {
return new Partitioner() {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> partitions = new HashMap<>(gridSize);
int targetGridSize = 0;
String step = "";
if(stepExecution.getStepName().equalsIgnoreCase("partitionReaderStep"))
{
step = "reader";
}
else
{
step = "processor";
}
targetGridSize = Integer.parseInt(stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerGridSize"));
for (int i = 0; i < targetGridSize; i++) {
ExecutionContext context1 = new ExecutionContext();
context1.put("partitionNumber", i);
partitions.put("partition" + i, context1);
}
return partitions;
}
};
}
@Bean
public KubernetesClient kuberentesClient()
{
KubernetesDeployerProperties kubernetesDeployerProperties = new KubernetesDeployerProperties();
return KubernetesClientFactory.getKubernetesClient(kubernetesDeployerProperties);
}
@Bean
@StepScope
public TaskLauncher taskLauncher( @Value("#{stepExecution}") StepExecution stepExecution)
{
KubernetesDeployerProperties kubernetesDeployerProperties = new KubernetesDeployerProperties();
kubernetesDeployerProperties.setNamespace("default");
kubernetesDeployerProperties.setCreateJob(true);
// Database setup to reference configmap for database info
List<KubernetesDeployerProperties.ConfigMapKeyRef> configMapKeyRefList = new ArrayList<KubernetesDeployerProperties.ConfigMapKeyRef>();
KubernetesDeployerProperties.ConfigMapKeyRef configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
configMapKeyRef.setConfigMapName("mariadb");
configMapKeyRef.setDataKey("SPRING_DATASOURCE_URL");
configMapKeyRef.setEnvVarName("SPRING_DATASOURCE_URL");
configMapKeyRefList.add(configMapKeyRef);
configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
configMapKeyRef.setConfigMapName("mariadb");
configMapKeyRef.setDataKey("SPRING_DATASOURCE_USERNAME");
configMapKeyRef.setEnvVarName("SPRING_DATASOURCE_USERNAME");
configMapKeyRefList.add(configMapKeyRef);
configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
configMapKeyRef.setConfigMapName("mariadb");
configMapKeyRef.setDataKey("SPRING_DATASOURCE_PASSWORD");
configMapKeyRef.setEnvVarName("SPRING_DATASOURCE_PASSWORD");
configMapKeyRefList.add(configMapKeyRef);
configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
configMapKeyRef.setConfigMapName("mariadb");
configMapKeyRef.setDataKey("SPRING_DATASOURCE_DRIVERCLASSNAME");
configMapKeyRef.setEnvVarName("SPRING_DATASOURCE_DRIVERCLASSNAME");
configMapKeyRefList.add(configMapKeyRef);
configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
configMapKeyRef.setConfigMapName("mariadb");
configMapKeyRef.setDataKey("SPRING_PROFILES_ACTIVE");
configMapKeyRef.setEnvVarName("SPRING_PROFILES_ACTIVE");
configMapKeyRefList.add(configMapKeyRef);
kubernetesDeployerProperties.setConfigMapKeyRefs(configMapKeyRefList);
// Set request resource
KubernetesDeployerProperties.RequestsResources request = new KubernetesDeployerProperties.RequestsResources();
KubernetesDeployerProperties.LimitsResources limit = new KubernetesDeployerProperties.LimitsResources();
String step = "";
if(stepExecution.getStepName().equalsIgnoreCase("partitionReaderStep"))
{
step="reader";
}
else
{
step="processor";
}
request.setCpu(stepExecution.getJobExecution().getExecutionContext().getString(step + "CPURequest"));
request.setMemory("2000Mi");
limit.setCpu(stepExecution.getJobExecution().getExecutionContext().getString(step +"CPULimit"));
limit.setMemory("3000Mi");
kubernetesDeployerProperties.setRequests(request);
kubernetesDeployerProperties.setLimits(limit);
// as build on local image, so need to use local
kubernetesDeployerProperties.setImagePullPolicy(ImagePullPolicy.IfNotPresent);
// Set task launcher properties to not repeat and not restart
KubernetesTaskLauncherProperties kubernetesTaskLauncherProperties = new KubernetesTaskLauncherProperties();
// https://kubernetes.io/docs/concepts/workloads/controllers/job/
// Set to never to create new pod on restart
kubernetesTaskLauncherProperties.setBackoffLimit(BACK_OFF_LIMIT);
kubernetesTaskLauncherProperties.setRestartPolicy(RestartPolicy.Never);
KubernetesTaskLauncher kubernetesTaskLauncher = new KubernetesTaskLauncher(kubernetesDeployerProperties,
kubernetesTaskLauncherProperties, kuberentesClient());
return kubernetesTaskLauncher;
}
@Bean(name = "partitionedJob")
@Profile("!worker")
public Job partitionedJob()throws Exception {
Random random = new Random();
return jobBuilderFactory.get("partitionedJob" + random.nextInt())
.start(partitionReaderStep())
.listener(jobExecutionListener())
.next(partitionProcessorStep())
.build();
}
@Bean(name = "partitionReaderStep")
public Step partitionReaderStep() throws Exception {
return stepBuilderFactory.get("partitionReaderStep")
.partitioner(workerStepReader().getName(), partitioner( null))
.step(workerStepReader())
.partitionHandler(partitionHandler(
taskLauncher( null),
jobExplorer, null))
.build();
}
@Bean(name = "partitionProcessorStep")
public Step partitionProcessorStep() throws Exception {
return stepBuilderFactory.get("partitionProcessorStep")
.partitioner(workerStepProcessor().getName(), partitioner( null))
.step(workerStepProcessor())
.partitionHandler(partitionHandler(
taskLauncher( null),
jobExplorer, null))
.build();
}
@Bean
@StepScope
public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
JobExplorer jobExplorer,
@Value("#{stepExecution}") StepExecution stepExecution) throws Exception {
String step ="processor";
if(stepExecution.getStepName().equalsIgnoreCase("partitionReaderStep")) {
step = "reader";
}
// Use local build image
DockerResource resource = new DockerResource(stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerImage"));
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerStep")
, taskRepository);
// Issue https://github.com/spring-cloud/spring-cloud-task/issues/793
// Perform the setting of execution as this partitioner now not created at task level so @beforetask is no longer vaild
// The problem is DeployerPartitionHandler utilize annoation @BeforeTask to force task to pass in TaskExecution object as part of Task setup
// But as this partionerHandler is now at @StepScope (instead of directly at @Bean level with @Enable Task), that setup is no longer triggered
// Resulted created DeployerHandler faced a null
// Below is essentially a workaround to use the current job execution id to retrieve the associated task execution id
// From there, got that task execution and passed to deploy handler to fulfill its need of taskExecution reference
// It seem to work, but still not clear if there is other side effect (so far during test not found any)
long executionId = taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId());
System.out.println("Current execution job to task execution id " + executionId);
TaskExecution taskExecution = taskExplorer.getTaskExecution(taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId()));
System.out.println("Current execution job to task execution is not null: " + (taskExecution != null));
partitionHandler.beforeTask(taskExecution);
List<String> commandLineArgs = new ArrayList<>(3);
commandLineArgs.add("--spring.profiles.active=worker");
commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
commandLineArgs.add("--spring.batch.initializer.enabled=false");
partitionHandler
.setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
partitionHandler.setMaxWorkers(Integer.parseInt(stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerGridSize")));
partitionHandler.setApplicationName(taskName_prefix + step);
return partitionHandler;
}
@Bean
public JobExecutionListener jobExecutionListener() {
JobExecutionListener listener = new JobExecutionListener(){
@Override
public void beforeJob(JobExecution jobExecution)
{
jobExecution.getExecutionContext().putString("readerCPURequest", "1");
jobExecution.getExecutionContext().putString("readerCPULimit", "2");
jobExecution.getExecutionContext().putString("readerWorkerGridSize", "1");
// For now using same image for reader/processor, but if it work, can split them
jobExecution.getExecutionContext().putString("readerWorkerImage", "worker:latest");
jobExecution.getExecutionContext().putString("readerWorkerStep", "workerStepReader");
jobExecution.getExecutionContext().putString("processorCPURequest", "3");
jobExecution.getExecutionContext().putString("processorCPULimit", "4");
jobExecution.getExecutionContext().putString("processorWorkerGridSize", "2");
// For now using same image for reader/processor, but if it work, will split them
jobExecution.getExecutionContext().putString("processorWorkerImage", "worker:latest");
jobExecution.getExecutionContext().putString("processorWorkerStep", "workerStepProcessor");
System.out.println("Set readerGridSize == " + jobExecution.getExecutionContext().getString("readerGridSize", "IT IS NULL WHICH IS INCORRECT"));
}
@Override
public void afterJob(JobExecution jobExecution) {
}
};
return listener;
}
@Bean
@Profile("worker")
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
return new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
}
@Bean(name = "workerStepReader")
public Step workerStepReader() {
return this.stepBuilderFactory.get("workerStepReader")
.tasklet(workerTaskletReader(null))
.build();
}
@Bean(name = "workerStepProcessor")
public Step workerStepProcessor() {
return this.stepBuilderFactory.get("workerStepProcessor")
.tasklet(workerTaskletProcessor(null))
.build();
}
@Bean
@StepScope
public Tasklet workerTaskletReader(
final @Value("#{stepExecution}") StepExecution stepExecution) {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
Integer partitionNumber = stepExecution.getExecutionContext().getInt("partitionNumber");
System.out.println("This workerTaskletReader ran partition: " + partitionNumber);
return RepeatStatus.FINISHED;
}
};
}
@Bean
@StepScope
public Tasklet workerTaskletProcessor(
final @Value("#{stepExecution}") StepExecution stepExecution) {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
Integer partitionNumber = stepExecution.getExecutionContext().getInt("partitionNumber");
System.out.println("This workerTaskletProcessor ran partition: " + partitionNumber);
return RepeatStatus.FINISHED;
}
};
}
}