Spring 多步骤批处理 Spring 用于远程分区的云任务 (PartitionHandler)

Spring Batch with multi - step Spring Cloud Task (PartitionHandler) for Remote Partition

最新更新(带有希望简化问题的图像)(感谢@Mahmoud 的反馈)

相关问题报告供其他参考(在这个原始 post 创建后,似乎有人为 Spring Cloud 提交了类似问题的问题,所以也在那里更新):

https://github.com/spring-cloud/spring-cloud-task/issues/793 与方法 #1

相关

https://github.com/spring-cloud/spring-cloud-task/issues/792 与方法 #2

相关

同时找到该问题的变通解决方案并更新该 git 集线器问题,一旦开发人员确认一切正常,将更新此问题 https://github.com/spring-cloud/spring-cloud-task/issues/793#issuecomment-894617929

我正在使用 spring 批处理作业开发涉及多步骤的应用程序,但遇到了一些障碍。确实尝试研究文档和不同的尝试,但没有成功。所以想检查社区是否可以阐明

Spring 批处理作业 1(接收到步骤 1 设置/步骤 2 设置的作业参数)

    Step 1 -> remote partition (partitionhandler (cpu/memory for step 1 + grid) + partitioner) with setting from step1 (job configuration or step configuration)

    Step 2 -> remote partition (partitionhandler (cpu/memory for step 2 + grid) + partitioner) with setting from step2 (job configuration or step configuration, and diff from step 1)

我们想要的原因是不同的 k8s 设置有不同的步骤(比如 cpu/memory/grid)

尝试次数:

  1. 创建两个分区处理程序(partitionHandlerReader + partitionHandlerProcessor)及其对应的启动器(LauncherReader + LauncherProcessor)

完整的项目可以在 https://github.com/danilko/spring-batch-remote-k8s-paritition-example/tree/attempt_1_two_partitionhandlers

配置的主要class是尽量简化为一个class https://github.com/danilko/spring-batch-remote-k8s-paritition-example/blob/attempt_1_two_partitionhandlers/src/main/java/com/example/batchprocessing/BatchConfiguration.java

  1. 使用一个 PartitionerHandler + 一个 TaskLauncher,但使用 @StepScope 用于基于步骤和作业设置的动态更改的后期绑定

完整的项目可以在 https://github.com/danilko/spring-batch-remote-k8s-paritition-example/tree/attempt_2_partitionhandler_with_stepscope

配置的主要class是尽量简化为一个class https://github.com/danilko/spring-batch-remote-k8s-paritition-example/blob/attempt_2_partitionhandler_with_stepscope/src/main/java/com/example/batchprocessing/BatchConfiguration.java

两个结果如下(上面 git 回购的完整跟踪):

作业触发时会报错(貌似初始启动成功,执行时报错)

因为以下只会在有多个 PartitionHandler 或当 Bean 位于 @StepScope@JobScope

时才会出现

java.lang.NullPointerException: null
    at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorker(DeployerPartitionHandler.java:347) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
    at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorkers(DeployerPartitionHandler.java:313) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
    at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.handle(DeployerPartitionHandler.java:302) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]

完整日志

  .   ____          _            __ _ _
 /\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.4.6)

2021-08-06 11:24:29.242  INFO 90294 --- [           main] c.e.b.BatchProcessingApplication         : Starting BatchProcessingApplication v0.0.1-SNAPSHOT using Java 11.0.7 on localhost.localdomain with PID 90294 (/home/danilko/IdeaProjects/partition/target/batchprocessing-0.0.1-SNAPSHOT.jar started by danilko in /home/danilko/IdeaProjects/partition)
2021-08-06 11:24:29.244  INFO 90294 --- [           main] c.e.b.BatchProcessingApplication         : The following profiles are active: controller
2021-08-06 11:24:29.790  INFO 90294 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-08-06 11:24:29.794  INFO 90294 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-08-06 11:24:29.797  INFO 90294 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-08-06 11:24:29.833  INFO 90294 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-06 11:24:29.947  INFO 90294 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-06 11:24:29.947  INFO 90294 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-06 11:24:29.959  INFO 90294 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.cloud.task.batch.configuration.TaskBatchAutoConfiguration' of type [org.springframework.cloud.task.batch.configuration.TaskBatchAutoConfiguration$$EnhancerBySpringCGLIB$e6c2be] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-06 11:24:29.968  INFO 90294 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.cloud.task.batch.listener.BatchEventAutoConfiguration' of type [org.springframework.cloud.task.batch.listener.BatchEventAutoConfiguration$$EnhancerBySpringCGLIB$$cc3cccc1] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-06 11:24:30.093  INFO 90294 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2021-08-06 11:24:30.160  INFO 90294 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2021-08-06 11:24:30.724  INFO 90294 --- [           main] o.s.b.c.r.s.JobRepositoryFactoryBean     : No database type set, using meta data indicating: MYSQL
2021-08-06 11:24:30.736  INFO 90294 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : No TaskExecutor has been set, defaulting to synchronous executor.
2021-08-06 11:24:30.897  INFO 90294 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-08-06 11:24:30.897  INFO 90294 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2021-08-06 11:24:30.897  INFO 90294 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2021-08-06 11:24:30.974  INFO 90294 --- [           main] c.e.b.BatchProcessingApplication         : Started BatchProcessingApplication in 2.024 seconds (JVM running for 2.366)
2021-08-06 11:24:30.975  INFO 90294 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
2021-08-06 11:24:31.010  INFO 90294 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=partitionedJob-1538890488]] launched with the following parameters: [{}]
Set readerGridSize == 1
2021-08-06 11:24:31.020  INFO 90294 --- [           main] o.s.c.t.b.l.TaskBatchExecutionListener   : The job execution id 22 was run within the task execution 54
2021-08-06 11:24:31.046  INFO 90294 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [partitionReaderStep]
2021-08-06 11:24:31.101 ERROR 90294 --- [           main] o.s.batch.core.step.AbstractStep         : Encountered an error executing step partitionReaderStep in job partitionedJob-1538890488

java.lang.NullPointerException: null
    at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorker(DeployerPartitionHandler.java:347) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
    at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorkers(DeployerPartitionHandler.java:313) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
    at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.handle(DeployerPartitionHandler.java:302) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.7.jar!/:5.3.7]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.7.jar!/:5.3.7]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.7.jar!/:5.3.7]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:137) ~[spring-aop-5.3.7.jar!/:5.3.7]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124) ~[spring-aop-5.3.7.jar!/:5.3.7]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.7.jar!/:5.3.7]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.7.jar!/:5.3.7]
    at com.sun.proxy.$Proxy65.handle(Unknown Source) ~[na:na]
    at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:106) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:152) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
    at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:413) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
    at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:320) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:149) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.3.7.jar!/:5.3.7]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.7.jar!/:5.3.7]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.7.jar!/:5.3.7]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.7.jar!/:5.3.7]
    at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:128) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.7.jar!/:5.3.7]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.7.jar!/:5.3.7]
    at com.sun.proxy.$Proxy51.run(Unknown Source) ~[na:na]
    at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.execute(JobLauncherApplicationRunner.java:199) ~[spring-boot-autoconfigure-2.4.6.jar!/:2.4.6]
    at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.executeLocalJobs(JobLauncherApplicationRunner.java:173) ~[spring-boot-autoconfigure-2.4.6.jar!/:2.4.6]
    at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.launchJobFromProperties(JobLauncherApplicationRunner.java:160) ~[spring-boot-autoconfigure-2.4.6.jar!/:2.4.6]
    at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.run(JobLauncherApplicationRunner.java:155) ~[spring-boot-autoconfigure-2.4.6.jar!/:2.4.6]
    at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.run(JobLauncherApplicationRunner.java:150) ~[spring-boot-autoconfigure-2.4.6.jar!/:2.4.6]
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:799) ~[spring-boot-2.4.6.jar!/:2.4.6]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:789) ~[spring-boot-2.4.6.jar!/:2.4.6]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:346) ~[spring-boot-2.4.6.jar!/:2.4.6]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1329) ~[spring-boot-2.4.6.jar!/:2.4.6]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1318) ~[spring-boot-2.4.6.jar!/:2.4.6]
    at com.example.batchprocessing.BatchProcessingApplication.main(BatchProcessingApplication.java:10) ~[classes!/:0.0.1-SNAPSHOT]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49) ~[batchprocessing-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
    at org.springframework.boot.loader.Launcher.launch(Launcher.java:108) ~[batchprocessing-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
    at org.springframework.boot.loader.Launcher.launch(Launcher.java:58) ~[batchprocessing-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
    at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88) ~[batchprocessing-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]


Study/Reference: 我在网上找到的大多数教程只涉及一个分区步骤。 https://dataflow.spring.io/docs/feature-guides/batch/partitioning/

提前感谢info/helps

  1. Is above even possible setup?

是的,没有什么能阻止您在单个 Spring 批处理作业中进行两个分区步骤。

  1. Is it possible to use JobScope/StepScope to pass info to the partitionhandler

是的,如果分区处理程序需要配置后期绑定功能,则可以将其声明为 job/step 作用域 bean。

@DanilKo 于 2021 年 8 月 14 日更新

原答案高阶正确。然而,要真正实现分区处理程序的步骤范围,需要修改代码

下面是分析+我的建议workaround/fix(也许最终代码维护者会有更好的方法让它工作,但到目前为止下面的修复对我有用)

问题正在继续讨论: https://github.com/spring-cloud/spring-cloud-task/issues/793(多分区处理程序讨论) https://github.com/spring-cloud/spring-cloud-task/issues/792 (此修复基于在步骤范围内使用 partitionerhandler 来配置不同的工作步骤 + 资源 + 最大工作人员)

根本原因分析(假设)

问题是 DeployerPartitionHandler 利用注解 @BeforeTask 强制任务传入 TaskExecution 对象作为任务设置的一部分

但是由于这个 partionerHandler 现在在@StepScope(而不是直接在@Bean level with @Enable Task)或者有两个 partitionHandler,该设置不再被触发,因为 @EnableTask 似乎无法在创建过程中找到一个 partitionhandler

https://github.com/spring-cloud/spring-cloud-task/blob/main/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/partition/DeployerPartitionHandler.java@269

创建的 DeployerHandler 在尝试启动时面临空 taskExecution(因为它从未设置)

https://github.com/spring-cloud/spring-cloud-task/blob/main/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/partition/DeployerPartitionHandler.java@347

解决方法

下面本质上是一种使用当前作业执行 ID 检索关联任务执行 ID 的变通方法 从那里获取任务执行并传递给部署处理程序以满足其对 taskExecution 引用的需求 好像有效果,但还不清楚有没有其他的副作用(目前在测试中还没有发现)

完整代码可以在https://github.com/danilko/spring-batch-remote-k8s-paritition-example/tree/attempt_2_partitionhandler_with_stepscope_workaround_resolution

中找到

在partitionHandler方法中

    @Bean
    @StepScope
    public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
                                                   JobExplorer jobExplorer,
                                             @Value("#{stepExecution}") StepExecution stepExecution) throws Exception {

...

      // After the declaration of partitionhandler
        DeployerPartitionHandler partitionHandler =
                new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
                        stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerStep")
                        , taskRepository);

        // Issue https://github.com/spring-cloud/spring-cloud-task/issues/793
        // Perform the setting of execution as this partitioner now not created at task level so @beforetask is no longer vaild
        // The problem is DeployerPartitionHandler utilize annoation @BeforeTask to force task to pass in TaskExecution object as part of Task setup
        // But as this partionerHandler is now at @StepScope (instead of directly at @Bean level with @Enable Task), that setup is no longer triggered
        // Resulted created DeployerHandler faced a null

        // Below is essentially a workaround to use the current job execution id to retrieve the associated task execution id
        // From there, got that task execution and passed to deploy handler to fulfill its need of taskExecution reference
        // It seem to work, but still not clear if there is other side effect (so far during test not found any)
        long executionId = taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId());

        System.out.println("Current execution job to task execution id " + executionId);
        TaskExecution taskExecution = taskExplorer.getTaskExecution(taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId()));
        System.out.println("Current execution job to task execution is not null: " + (taskExecution != null));
        partitionHandler.beforeTask(taskExecution);
...

// rest of code continue

(注意它利用 stepExecution 上下文找出当前触发器步骤名称,因此分配不同的工作步骤) 在这种情况下,工作人员名称来自预定义的作业执行,但也可能来自作业参数或其他地方)

该作业上下文中填充了作业列表器

作业配置了作业侦听器


    @Bean(name = "partitionedJob")
    @Profile("!worker")
    public Job partitionedJob()throws Exception {
        Random random = new Random();
        return jobBuilderFactory.get("partitionedJob" + random.nextInt())
                .start(partitionReaderStep())
                .listener(jobExecutionListener())
                .next(partitionProcessorStep())
                .build();
    }

在工作侦听器中填充它


    @Bean
    public JobExecutionListener jobExecutionListener() {
    JobExecutionListener listener = new JobExecutionListener(){
        @Override
        public void beforeJob(JobExecution jobExecution)
        {
            jobExecution.getExecutionContext().putString("readerCPURequest", "1");
            jobExecution.getExecutionContext().putString("readerCPULimit", "2");

            jobExecution.getExecutionContext().putString("readerWorkerGridSize", "1");

            // For now using same image for reader/processor, but if it work, can split them
            jobExecution.getExecutionContext().putString("readerWorkerImage", "worker:latest");
            jobExecution.getExecutionContext().putString("readerWorkerStep", "workerStepReader");

            jobExecution.getExecutionContext().putString("processorCPURequest", "3");
            jobExecution.getExecutionContext().putString("processorCPULimit", "4");

            jobExecution.getExecutionContext().putString("processorWorkerGridSize", "2");

            // For now using same image for reader/processor, but if it work, will split them
            jobExecution.getExecutionContext().putString("processorWorkerImage", "worker:latest");
            jobExecution.getExecutionContext().putString("processorWorkerStep", "workerStepProcessor");

            System.out.println("Set readerGridSize == " + jobExecution.getExecutionContext().getString("readerGridSize", "IT IS NULL WHICH IS INCORRECT"));

        }

        @Override
        public void afterJob(JobExecution jobExecution) {
        }
    };

    return listener;
    }

完整代码(也可以在我的代码 github 中找到,在应用解决方法修复后):https://github.com/danilko/spring-batch-remote-k8s-paritition-example/blob/main/src/main/java/com/example/batchprocessing/BatchConfiguration.java

package com.example.batchprocessing;

import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.batch.JobList;
import io.fabric8.kubernetes.api.model.batch.JobSpec;
import io.fabric8.kubernetes.api.model.batch.JobStatus;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.deployer.resource.docker.DockerResource;
import org.springframework.cloud.deployer.resource.support.DelegatingResourceLoader;
import org.springframework.cloud.deployer.spi.kubernetes.*;
import org.springframework.cloud.deployer.spi.task.TaskLauncher;
import org.springframework.cloud.task.batch.partition.*;
import org.springframework.cloud.task.configuration.EnableTask;
import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.cloud.task.repository.TaskExplorer;
import org.springframework.cloud.task.repository.TaskRepository;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;
import org.springframework.core.env.SystemEnvironmentPropertySource;
import org.springframework.core.io.Resource;
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.util.StringUtils;

import java.util.*;


@Configuration
@EnableBatchProcessing
@EnableTask
public class BatchConfiguration {

    private static int BACK_OFF_LIMIT = 6;

    // Set the kuberentes job name
    private String taskName_prefix="partitionedbatchjob";

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    public JobExplorer jobExplorer;

    @Autowired
    public JobRepository jobRepository;

    @Autowired
    public TaskExecutor taskExecutor;

    @Autowired
    public TaskRepository taskRepository;

    @Autowired
    public TaskExplorer taskExplorer;

    @Autowired
    private ConfigurableApplicationContext context;

    @Autowired
    private DelegatingResourceLoader resourceLoader;

    @Autowired
    private Environment environment;

    @Bean
    @StepScope
    public Partitioner partitioner( @Value("#{stepExecution}") StepExecution stepExecution) {
        return new Partitioner() {
            @Override
            public Map<String, ExecutionContext> partition(int gridSize) {

                Map<String, ExecutionContext> partitions = new HashMap<>(gridSize);

                int targetGridSize = 0;
                String step = "";
                if(stepExecution.getStepName().equalsIgnoreCase("partitionReaderStep"))
                {
                    step = "reader";
                }
                else
                {
                    step = "processor";
                }

                targetGridSize = Integer.parseInt(stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerGridSize"));

                for (int i = 0; i < targetGridSize; i++) {
                    ExecutionContext context1 = new ExecutionContext();
                    context1.put("partitionNumber", i);

                    partitions.put("partition" + i, context1);
                }

                return partitions;
            }
        };
    }

    @Bean
    public KubernetesClient kuberentesClient()
    {
        KubernetesDeployerProperties kubernetesDeployerProperties = new KubernetesDeployerProperties();

        return KubernetesClientFactory.getKubernetesClient(kubernetesDeployerProperties);
    }


    @Bean
    @StepScope
    public TaskLauncher taskLauncher( @Value("#{stepExecution}") StepExecution stepExecution)
    {
        KubernetesDeployerProperties kubernetesDeployerProperties = new KubernetesDeployerProperties();
        kubernetesDeployerProperties.setNamespace("default");

        kubernetesDeployerProperties.setCreateJob(true);

        // Database setup to reference configmap for database info
        List<KubernetesDeployerProperties.ConfigMapKeyRef> configMapKeyRefList = new ArrayList<KubernetesDeployerProperties.ConfigMapKeyRef>();
        KubernetesDeployerProperties.ConfigMapKeyRef configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
        configMapKeyRef.setConfigMapName("mariadb");
        configMapKeyRef.setDataKey("SPRING_DATASOURCE_URL");
        configMapKeyRef.setEnvVarName("SPRING_DATASOURCE_URL");
        configMapKeyRefList.add(configMapKeyRef);

        configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
        configMapKeyRef.setConfigMapName("mariadb");
        configMapKeyRef.setDataKey("SPRING_DATASOURCE_USERNAME");
        configMapKeyRef.setEnvVarName("SPRING_DATASOURCE_USERNAME");
        configMapKeyRefList.add(configMapKeyRef);

        configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
        configMapKeyRef.setConfigMapName("mariadb");
        configMapKeyRef.setDataKey("SPRING_DATASOURCE_PASSWORD");
        configMapKeyRef.setEnvVarName("SPRING_DATASOURCE_PASSWORD");
        configMapKeyRefList.add(configMapKeyRef);

        configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
        configMapKeyRef.setConfigMapName("mariadb");
        configMapKeyRef.setDataKey("SPRING_DATASOURCE_DRIVERCLASSNAME");
        configMapKeyRef.setEnvVarName("SPRING_DATASOURCE_DRIVERCLASSNAME");
        configMapKeyRefList.add(configMapKeyRef);

        configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
        configMapKeyRef.setConfigMapName("mariadb");
        configMapKeyRef.setDataKey("SPRING_PROFILES_ACTIVE");
        configMapKeyRef.setEnvVarName("SPRING_PROFILES_ACTIVE");
        configMapKeyRefList.add(configMapKeyRef);


        kubernetesDeployerProperties.setConfigMapKeyRefs(configMapKeyRefList);

        // Set request resource
        KubernetesDeployerProperties.RequestsResources request = new KubernetesDeployerProperties.RequestsResources();
        KubernetesDeployerProperties.LimitsResources limit = new KubernetesDeployerProperties.LimitsResources();

        String step = "";

        if(stepExecution.getStepName().equalsIgnoreCase("partitionReaderStep"))
        {
            step="reader";
        }
        else
        {
            step="processor";
        }

        request.setCpu(stepExecution.getJobExecution().getExecutionContext().getString(step + "CPURequest"));
        request.setMemory("2000Mi");


        limit.setCpu(stepExecution.getJobExecution().getExecutionContext().getString(step +"CPULimit"));
        limit.setMemory("3000Mi");


        kubernetesDeployerProperties.setRequests(request);
        kubernetesDeployerProperties.setLimits(limit);

        // as build on local image, so need to use local
        kubernetesDeployerProperties.setImagePullPolicy(ImagePullPolicy.IfNotPresent);

        // Set task launcher properties to not repeat and not restart
        KubernetesTaskLauncherProperties kubernetesTaskLauncherProperties = new KubernetesTaskLauncherProperties();

        // https://kubernetes.io/docs/concepts/workloads/controllers/job/
        // Set to never to create new pod on restart
        kubernetesTaskLauncherProperties.setBackoffLimit(BACK_OFF_LIMIT);
        kubernetesTaskLauncherProperties.setRestartPolicy(RestartPolicy.Never);
        KubernetesTaskLauncher kubernetesTaskLauncher = new KubernetesTaskLauncher(kubernetesDeployerProperties,
                kubernetesTaskLauncherProperties, kuberentesClient());

        return kubernetesTaskLauncher;
    }


    @Bean(name = "partitionedJob")
    @Profile("!worker")
    public Job partitionedJob()throws Exception {
        Random random = new Random();
        return jobBuilderFactory.get("partitionedJob" + random.nextInt())
                .start(partitionReaderStep())
                .listener(jobExecutionListener())
                .next(partitionProcessorStep())
                .build();
    }

    @Bean(name = "partitionReaderStep")
    public Step partitionReaderStep() throws Exception {

        return stepBuilderFactory.get("partitionReaderStep")
                .partitioner(workerStepReader().getName(),  partitioner( null))
                .step(workerStepReader())
                .partitionHandler(partitionHandler(
                        taskLauncher( null),
                        jobExplorer, null))
                .build();
    }

    @Bean(name = "partitionProcessorStep")
    public Step partitionProcessorStep() throws Exception {

        return stepBuilderFactory.get("partitionProcessorStep")
                .partitioner(workerStepProcessor().getName(), partitioner( null))
                .step(workerStepProcessor())
                .partitionHandler(partitionHandler(
                        taskLauncher( null),
                        jobExplorer, null))
                .build();
    }


    @Bean
    @StepScope
    public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
                                                   JobExplorer jobExplorer,
                                             @Value("#{stepExecution}") StepExecution stepExecution) throws Exception {

        String step ="processor";

        if(stepExecution.getStepName().equalsIgnoreCase("partitionReaderStep")) {
            step = "reader";
        }

        // Use local build image
        DockerResource resource = new DockerResource(stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerImage"));


        DeployerPartitionHandler partitionHandler =
                new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
                        stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerStep")
                        , taskRepository);

        // Issue https://github.com/spring-cloud/spring-cloud-task/issues/793
        // Perform the setting of execution as this partitioner now not created at task level so @beforetask is no longer vaild
        // The problem is DeployerPartitionHandler utilize annoation @BeforeTask to force task to pass in TaskExecution object as part of Task setup
        // But as this partionerHandler is now at @StepScope (instead of directly at @Bean level with @Enable Task), that setup is no longer triggered
        // Resulted created DeployerHandler faced a null

        // Below is essentially a workaround to use the current job execution id to retrieve the associated task execution id
        // From there, got that task execution and passed to deploy handler to fulfill its need of taskExecution reference
        // It seem to work, but still not clear if there is other side effect (so far during test not found any)
        long executionId = taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId());

        System.out.println("Current execution job to task execution id " + executionId);
        TaskExecution taskExecution = taskExplorer.getTaskExecution(taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId()));
        System.out.println("Current execution job to task execution is not null: " + (taskExecution != null));
        partitionHandler.beforeTask(taskExecution);

        List<String> commandLineArgs = new ArrayList<>(3);
        commandLineArgs.add("--spring.profiles.active=worker");
        commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
        commandLineArgs.add("--spring.batch.initializer.enabled=false");
        partitionHandler
                .setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
        partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
        partitionHandler.setMaxWorkers(Integer.parseInt(stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerGridSize")));

            partitionHandler.setApplicationName(taskName_prefix + step);

        return partitionHandler;
    }

    @Bean
    public JobExecutionListener jobExecutionListener() {
    JobExecutionListener listener = new JobExecutionListener(){
        @Override
        public void beforeJob(JobExecution jobExecution)
        {
            jobExecution.getExecutionContext().putString("readerCPURequest", "1");
            jobExecution.getExecutionContext().putString("readerCPULimit", "2");

            jobExecution.getExecutionContext().putString("readerWorkerGridSize", "1");

            // For now using same image for reader/processor, but if it work, can split them
            jobExecution.getExecutionContext().putString("readerWorkerImage", "worker:latest");
            jobExecution.getExecutionContext().putString("readerWorkerStep", "workerStepReader");

            jobExecution.getExecutionContext().putString("processorCPURequest", "3");
            jobExecution.getExecutionContext().putString("processorCPULimit", "4");

            jobExecution.getExecutionContext().putString("processorWorkerGridSize", "2");

            // For now using same image for reader/processor, but if it work, will split them
            jobExecution.getExecutionContext().putString("processorWorkerImage", "worker:latest");
            jobExecution.getExecutionContext().putString("processorWorkerStep", "workerStepProcessor");

            System.out.println("Set readerGridSize == " + jobExecution.getExecutionContext().getString("readerGridSize", "IT IS NULL WHICH IS INCORRECT"));

        }

        @Override
        public void afterJob(JobExecution jobExecution) {
        }
    };

    return listener;
    }

    @Bean
    @Profile("worker")
    public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
        return new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
    }

    @Bean(name = "workerStepReader")
    public Step workerStepReader() {
        return this.stepBuilderFactory.get("workerStepReader")
                .tasklet(workerTaskletReader(null))
                .build();
    }

    @Bean(name = "workerStepProcessor")
    public Step workerStepProcessor() {
        return this.stepBuilderFactory.get("workerStepProcessor")
                .tasklet(workerTaskletProcessor(null))
                .build();
    }



    @Bean
    @StepScope
    public Tasklet workerTaskletReader(
            final @Value("#{stepExecution}") StepExecution stepExecution) {

        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                Integer partitionNumber = stepExecution.getExecutionContext().getInt("partitionNumber");
                System.out.println("This workerTaskletReader ran partition: " + partitionNumber);

                return RepeatStatus.FINISHED;
            }
        };
    }

    @Bean
    @StepScope
    public Tasklet workerTaskletProcessor(
            final @Value("#{stepExecution}") StepExecution stepExecution) {

        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                Integer partitionNumber = stepExecution.getExecutionContext().getInt("partitionNumber");
                System.out.println("This workerTaskletProcessor ran partition: " + partitionNumber);

                return RepeatStatus.FINISHED;
            }
        };
    }
}