从数据库中获取后动态设置块大小

Set chunksize dynamically after fetching from db

我需要在存储在数据库中的 spring 批处理作业的步骤中动态设置块大小,即需要从数据库中获取块大小并将其设置到 bean 中。

我的查询是这样的:

select CHUNK_SIZE from SOME_TABLE_NAME where ID='some_id_param_value'

此处 ID 的值将来自作业参数,该参数是通过随请求传递到 Rest Controller(同时触发批处理作业)

的请求参数设置的

我想从数据库中获取这个 CHUNK_SIZE 并将其动态设置到作业的步骤中。 我们的要求是块大小根据 ID 值的步骤而变化,其详细信息存储在数据库 table 中。例如:

ID CHUNK_SIZE
01 1000
02 2500

我知道作业中的bean是在配置时设置的,作业参数是在运行时触发作业时传递的。

编辑:

MahmoudBenHassine 提供的示例使用 @JobScope 并使用 @Value("#{jobParameters['id']}") 访问步骤 bean 中的 jobParameters。我尝试使用 jobExecutionContext 实现类似的方法,如下所示:

  1. 从 db table 中获取了 chunkSize StepExecutionListener 的 beforeStep 方法并将其设置在 ExecutionContext.

  2. @JobScope注释了步骤bean并使用 @Value("#{jobExecutionContext['chunk']}") 在步骤中访问它 豆.

  3. 但我遇到以下错误:

    Error creating bean with name 'scopedTarget.step' defined in class path resource [com/sample/config/SampleBatchConfig.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.batch.core.Step]: Factory method 'step' threw exception; nested exception is java.lang.NullPointerException

无法从 jobExecutionContext 访问 'chunk' 键值,因此抛出 NullPointerException。 是否需要以某种方式提升它以便在步骤 bean 中访问它?如果是,将不胜感激快速示例或指导。

我的控制器class:

@RestController
public class SampleController {

    @Autowired
    JobLauncher sampleJobLauncher;

    @Autowired
    Job sampleJob;
    
    @GetMapping("/launch")
    public BatchStatus launch(@RequestParam(name = "id", required = true) String id){

        Map<String, JobParameter> map = new HashMap<>();
        map.put("id",  new JobParameter(id));
        map.put("timestamp",  new JobParameter(System.currentTimeMillis));

        JobParameters params = new JobParameters(map);
        JobExecution j = sampleJobLauncher.run(sampleJob, params);

        return j.getStatus();
    }
}   

我的批处理配置class(包含作业和步骤 bean):

@Configuration
public class SampleBatchConfig{

    @Autowired
    private JobBuilderFactory myJobBuilderFactory;

    @Autowired
    private StepBuilderFactory myStepBuilderFactory;

    @Autowired
    private MyRepoClass myRepo; // this class contains the jdbc method to fetch chunksize from the db table
    
    @Autowired
    MyReader myReader;
    
    @Autowired
    MyWriter myWriter;
    
    @Bean
    @JobScope
    public Step sampleStep(@Value("#{jobExecutionContext['chunk']}") Integer chunkSize){
        return myStepBuilderFactory.get("sampleStep")
                .<MyClass, MyClass>chunk(chunkSize) //TODO ~instead of hardcoding the chunkSize or getting it from the properties file using @Value, the requirement is to fetch it from the db table using the above mentioned query with id job parameter and set it here
                .reader(myReader.sampleReader())
                .writer(myWriter.sampleWriter())
                .listener(new StepExecutionListener() {
                    @Override
                    public void beforeStep(StepExecution stepExecution) {
                        int chunk = myRepo.findChunkSize(stepExecution.getJobExecution().getExecutionContext().get("id")); // this method call fetches chunksize from the db table using the id job parameter
                        stepExecution.getJobExecution().getExecutionContext().put("chunk", chunk);
                    }

                    @Override
                    public ExitStatus afterStep(StepExecution stepExecution) {
                        return null;
                    }
                })
                .build();
    }

    @Bean
    public Job job(){
        return myJobBuilderFactory.get("sampleJob")
                .incrementer(new RunIdIncrementer())
                .start(sampleStep(null))
                .build();
    }

}

NOTE: The job may have multiple steps with different chunkSizes, and in that case chunkSize is to be fetched separately for each step.

编辑 2: 如下更改我的步骤定义有效,但存在问题。 这里 reader 读取一个包含 17 个项目的列表,大小为 4。

@Bean
@JobScope
public Step sampleStep(@Value("#{jobParameters['id']}") Integer id){
   int chunkSize = myRepo.findChunkSize(id); // this method call fetches chunksize from the db table using the id job parameter
   return myStepBuilderFactory.get("sampleStep")
                .<MyClass, MyClass>chunk(chunkSize)
                .reader(myReader.sampleReader())
                .writer(myWriter.sampleWriter())  
                .listener(new ChunkListenerSupport() {
                    @Override
                    public void afterChunk(ChunkContext context) {
                        System.out.println("MyJob.afterChunk");
                    }

                    @Override
                    public void beforeChunk(ChunkContext context) {
                        System.out.println("MyJob.beforeChunk");
                    }
                })                      
                .build();
}

我第一次从 url 触发作业时,它工作正常并打印以下内容:(块大小在数据库 table 中设置为 4)

2021-05-03 15:06:44.859  INFO 11924 --- [nio-8081-exec-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [sampleStep]
MyJob.beforeChunk

item = 1

item = 2

item = 3

item = 4

MyJob.afterChunk

MyJob.beforeChunk

item = 5

item = 6

item = 7

item = 8

MyJob.afterChunk

MyJob.beforeChunk

item = 9

item = 10

item = 11

item = 12

MyJob.afterChunk

MyJob.beforeChunk

item = 13

item = 14

item = 15

item = 16

MyJob.afterChunk

MyJob.beforeChunk

item = 17

MyJob.afterChunk

但是如果我再次触发作业,而不重新启动 server/spring 容器,则会打印以下内容:

2021-05-03 15:11:02.427  INFO 11924 --- [nio-8081-exec-4] o.s.batch.core.job.SimpleStepHandler     : Executing step: [sampleStep]

MyJob.beforeChunk

MyJob.afterChunk

简而言之,当服务器重新启动时,它只工作一次。但是在不重启服务器的情况下,它对后续的作业执行不起作用。

由于您将 ID 作为作业参数传递,并且您希望在配置步骤时根据该 ID 从数据库中动态获取块大小,因此您可以使用作业范围的步骤,如下所示:

@Bean
@JobScope
public Step sampleStep(@Value("#{jobParameters['id']}") Integer id){
   int chunkSize = myRepo.findChunkSize(id); // this method call fetches chunksize from the db table using the id job parameter
   return myStepBuilderFactory.get("sampleStep")
                .<MyClass, MyClass>chunk(chunkSize)
                .reader(myReader.sampleReader())
                .writer(myWriter.sampleWriter())                        
                .build();
}