从数据库中获取后动态设置块大小
Set chunksize dynamically after fetching from db
我需要在存储在数据库中的 spring 批处理作业的步骤中动态设置块大小,即需要从数据库中获取块大小并将其设置到 bean 中。
我的查询是这样的:
select CHUNK_SIZE from SOME_TABLE_NAME where ID='some_id_param_value'
此处 ID
的值将来自作业参数,该参数是通过随请求传递到 Rest Controller
(同时触发批处理作业)
的请求参数设置的
我想从数据库中获取这个 CHUNK_SIZE
并将其动态设置到作业的步骤中。
我们的要求是块大小根据 ID 值的步骤而变化,其详细信息存储在数据库 table 中。例如:
ID
CHUNK_SIZE
01
1000
02
2500
我知道作业中的bean是在配置时设置的,作业参数是在运行时触发作业时传递的。
编辑:
MahmoudBenHassine 提供的示例使用 @JobScope
并使用 @Value("#{jobParameters['id']}")
访问步骤 bean 中的 jobParameters。我尝试使用 jobExecutionContext
实现类似的方法,如下所示:
从 db table 中获取了 chunkSize
StepExecutionListener 的 beforeStep
方法并将其设置在
ExecutionContext
.
用@JobScope
注释了步骤bean并使用
@Value("#{jobExecutionContext['chunk']}")
在步骤中访问它
豆.
但我遇到以下错误:
Error creating bean with name 'scopedTarget.step' defined in class path resource [com/sample/config/SampleBatchConfig.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.batch.core.Step]: Factory method 'step' threw exception; nested exception is java.lang.NullPointerException
无法从 jobExecutionContext
访问 'chunk' 键值,因此抛出 NullPointerException
。
是否需要以某种方式提升它以便在步骤 bean 中访问它?如果是,将不胜感激快速示例或指导。
我的控制器class:
@RestController
public class SampleController {
@Autowired
JobLauncher sampleJobLauncher;
@Autowired
Job sampleJob;
@GetMapping("/launch")
public BatchStatus launch(@RequestParam(name = "id", required = true) String id){
Map<String, JobParameter> map = new HashMap<>();
map.put("id", new JobParameter(id));
map.put("timestamp", new JobParameter(System.currentTimeMillis));
JobParameters params = new JobParameters(map);
JobExecution j = sampleJobLauncher.run(sampleJob, params);
return j.getStatus();
}
}
我的批处理配置class(包含作业和步骤 bean):
@Configuration
public class SampleBatchConfig{
@Autowired
private JobBuilderFactory myJobBuilderFactory;
@Autowired
private StepBuilderFactory myStepBuilderFactory;
@Autowired
private MyRepoClass myRepo; // this class contains the jdbc method to fetch chunksize from the db table
@Autowired
MyReader myReader;
@Autowired
MyWriter myWriter;
@Bean
@JobScope
public Step sampleStep(@Value("#{jobExecutionContext['chunk']}") Integer chunkSize){
return myStepBuilderFactory.get("sampleStep")
.<MyClass, MyClass>chunk(chunkSize) //TODO ~instead of hardcoding the chunkSize or getting it from the properties file using @Value, the requirement is to fetch it from the db table using the above mentioned query with id job parameter and set it here
.reader(myReader.sampleReader())
.writer(myWriter.sampleWriter())
.listener(new StepExecutionListener() {
@Override
public void beforeStep(StepExecution stepExecution) {
int chunk = myRepo.findChunkSize(stepExecution.getJobExecution().getExecutionContext().get("id")); // this method call fetches chunksize from the db table using the id job parameter
stepExecution.getJobExecution().getExecutionContext().put("chunk", chunk);
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return null;
}
})
.build();
}
@Bean
public Job job(){
return myJobBuilderFactory.get("sampleJob")
.incrementer(new RunIdIncrementer())
.start(sampleStep(null))
.build();
}
}
NOTE:
The job may have multiple steps with different chunkSizes, and in that case chunkSize is to be fetched separately for each step.
编辑 2:
如下更改我的步骤定义有效,但存在问题。
这里 reader 读取一个包含 17 个项目的列表,大小为 4。
@Bean
@JobScope
public Step sampleStep(@Value("#{jobParameters['id']}") Integer id){
int chunkSize = myRepo.findChunkSize(id); // this method call fetches chunksize from the db table using the id job parameter
return myStepBuilderFactory.get("sampleStep")
.<MyClass, MyClass>chunk(chunkSize)
.reader(myReader.sampleReader())
.writer(myWriter.sampleWriter())
.listener(new ChunkListenerSupport() {
@Override
public void afterChunk(ChunkContext context) {
System.out.println("MyJob.afterChunk");
}
@Override
public void beforeChunk(ChunkContext context) {
System.out.println("MyJob.beforeChunk");
}
})
.build();
}
我第一次从 url 触发作业时,它工作正常并打印以下内容:(块大小在数据库 table 中设置为 4)
2021-05-03 15:06:44.859 INFO 11924 --- [nio-8081-exec-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [sampleStep]
MyJob.beforeChunk
item = 1
item = 2
item = 3
item = 4
MyJob.afterChunk
MyJob.beforeChunk
item = 5
item = 6
item = 7
item = 8
MyJob.afterChunk
MyJob.beforeChunk
item = 9
item = 10
item = 11
item = 12
MyJob.afterChunk
MyJob.beforeChunk
item = 13
item = 14
item = 15
item = 16
MyJob.afterChunk
MyJob.beforeChunk
item = 17
MyJob.afterChunk
但是如果我再次触发作业,而不重新启动 server/spring 容器,则会打印以下内容:
2021-05-03 15:11:02.427 INFO 11924 --- [nio-8081-exec-4] o.s.batch.core.job.SimpleStepHandler : Executing step: [sampleStep]
MyJob.beforeChunk
MyJob.afterChunk
简而言之,当服务器重新启动时,它只工作一次。但是在不重启服务器的情况下,它对后续的作业执行不起作用。
由于您将 ID 作为作业参数传递,并且您希望在配置步骤时根据该 ID 从数据库中动态获取块大小,因此您可以使用作业范围的步骤,如下所示:
@Bean
@JobScope
public Step sampleStep(@Value("#{jobParameters['id']}") Integer id){
int chunkSize = myRepo.findChunkSize(id); // this method call fetches chunksize from the db table using the id job parameter
return myStepBuilderFactory.get("sampleStep")
.<MyClass, MyClass>chunk(chunkSize)
.reader(myReader.sampleReader())
.writer(myWriter.sampleWriter())
.build();
}
我需要在存储在数据库中的 spring 批处理作业的步骤中动态设置块大小,即需要从数据库中获取块大小并将其设置到 bean 中。
我的查询是这样的:
select CHUNK_SIZE from SOME_TABLE_NAME where ID='some_id_param_value'
此处 ID
的值将来自作业参数,该参数是通过随请求传递到 Rest Controller
(同时触发批处理作业)
我想从数据库中获取这个 CHUNK_SIZE
并将其动态设置到作业的步骤中。
我们的要求是块大小根据 ID 值的步骤而变化,其详细信息存储在数据库 table 中。例如:
ID | CHUNK_SIZE |
---|---|
01 | 1000 |
02 | 2500 |
我知道作业中的bean是在配置时设置的,作业参数是在运行时触发作业时传递的。
编辑:
MahmoudBenHassine 提供的示例使用 @JobScope
并使用 @Value("#{jobParameters['id']}")
访问步骤 bean 中的 jobParameters。我尝试使用 jobExecutionContext
实现类似的方法,如下所示:
从 db table 中获取了 chunkSize StepExecutionListener 的
beforeStep
方法并将其设置在ExecutionContext
.用
@JobScope
注释了步骤bean并使用@Value("#{jobExecutionContext['chunk']}")
在步骤中访问它 豆.但我遇到以下错误:
Error creating bean with name 'scopedTarget.step' defined in class path resource [com/sample/config/SampleBatchConfig.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.batch.core.Step]: Factory method 'step' threw exception; nested exception is java.lang.NullPointerException
无法从 jobExecutionContext
访问 'chunk' 键值,因此抛出 NullPointerException
。
是否需要以某种方式提升它以便在步骤 bean 中访问它?如果是,将不胜感激快速示例或指导。
我的控制器class:
@RestController
public class SampleController {
@Autowired
JobLauncher sampleJobLauncher;
@Autowired
Job sampleJob;
@GetMapping("/launch")
public BatchStatus launch(@RequestParam(name = "id", required = true) String id){
Map<String, JobParameter> map = new HashMap<>();
map.put("id", new JobParameter(id));
map.put("timestamp", new JobParameter(System.currentTimeMillis));
JobParameters params = new JobParameters(map);
JobExecution j = sampleJobLauncher.run(sampleJob, params);
return j.getStatus();
}
}
我的批处理配置class(包含作业和步骤 bean):
@Configuration
public class SampleBatchConfig{
@Autowired
private JobBuilderFactory myJobBuilderFactory;
@Autowired
private StepBuilderFactory myStepBuilderFactory;
@Autowired
private MyRepoClass myRepo; // this class contains the jdbc method to fetch chunksize from the db table
@Autowired
MyReader myReader;
@Autowired
MyWriter myWriter;
@Bean
@JobScope
public Step sampleStep(@Value("#{jobExecutionContext['chunk']}") Integer chunkSize){
return myStepBuilderFactory.get("sampleStep")
.<MyClass, MyClass>chunk(chunkSize) //TODO ~instead of hardcoding the chunkSize or getting it from the properties file using @Value, the requirement is to fetch it from the db table using the above mentioned query with id job parameter and set it here
.reader(myReader.sampleReader())
.writer(myWriter.sampleWriter())
.listener(new StepExecutionListener() {
@Override
public void beforeStep(StepExecution stepExecution) {
int chunk = myRepo.findChunkSize(stepExecution.getJobExecution().getExecutionContext().get("id")); // this method call fetches chunksize from the db table using the id job parameter
stepExecution.getJobExecution().getExecutionContext().put("chunk", chunk);
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return null;
}
})
.build();
}
@Bean
public Job job(){
return myJobBuilderFactory.get("sampleJob")
.incrementer(new RunIdIncrementer())
.start(sampleStep(null))
.build();
}
}
NOTE: The job may have multiple steps with different chunkSizes, and in that case chunkSize is to be fetched separately for each step.
编辑 2: 如下更改我的步骤定义有效,但存在问题。 这里 reader 读取一个包含 17 个项目的列表,大小为 4。
@Bean
@JobScope
public Step sampleStep(@Value("#{jobParameters['id']}") Integer id){
int chunkSize = myRepo.findChunkSize(id); // this method call fetches chunksize from the db table using the id job parameter
return myStepBuilderFactory.get("sampleStep")
.<MyClass, MyClass>chunk(chunkSize)
.reader(myReader.sampleReader())
.writer(myWriter.sampleWriter())
.listener(new ChunkListenerSupport() {
@Override
public void afterChunk(ChunkContext context) {
System.out.println("MyJob.afterChunk");
}
@Override
public void beforeChunk(ChunkContext context) {
System.out.println("MyJob.beforeChunk");
}
})
.build();
}
我第一次从 url 触发作业时,它工作正常并打印以下内容:(块大小在数据库 table 中设置为 4)
2021-05-03 15:06:44.859 INFO 11924 --- [nio-8081-exec-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [sampleStep]
MyJob.beforeChunk
item = 1
item = 2
item = 3
item = 4
MyJob.afterChunk
MyJob.beforeChunk
item = 5
item = 6
item = 7
item = 8
MyJob.afterChunk
MyJob.beforeChunk
item = 9
item = 10
item = 11
item = 12
MyJob.afterChunk
MyJob.beforeChunk
item = 13
item = 14
item = 15
item = 16
MyJob.afterChunk
MyJob.beforeChunk
item = 17
MyJob.afterChunk
但是如果我再次触发作业,而不重新启动 server/spring 容器,则会打印以下内容:
2021-05-03 15:11:02.427 INFO 11924 --- [nio-8081-exec-4] o.s.batch.core.job.SimpleStepHandler : Executing step: [sampleStep]
MyJob.beforeChunk
MyJob.afterChunk
简而言之,当服务器重新启动时,它只工作一次。但是在不重启服务器的情况下,它对后续的作业执行不起作用。
由于您将 ID 作为作业参数传递,并且您希望在配置步骤时根据该 ID 从数据库中动态获取块大小,因此您可以使用作业范围的步骤,如下所示:
@Bean
@JobScope
public Step sampleStep(@Value("#{jobParameters['id']}") Integer id){
int chunkSize = myRepo.findChunkSize(id); // this method call fetches chunksize from the db table using the id job parameter
return myStepBuilderFactory.get("sampleStep")
.<MyClass, MyClass>chunk(chunkSize)
.reader(myReader.sampleReader())
.writer(myWriter.sampleWriter())
.build();
}