如何从 spring 引导 Web 应用程序中的 Spring 批处理作业的 ItemReader class 访问作业参数
How to access jobparameter from ItemReader class of Spring batch job in spring boot web application
我正在从我的控制器启动一个 spring 批处理作业。我需要通过基于 java 的 configuration.But 从配置的自定义 ItemReader
访问作业参数 fileName
我总是从我的 ItemReader 中获取空文件名。请帮忙。
我这样启动是因为我必须根据用户请求更改 ItemReader 文件。
我的控制器:
@RestController
@RequestMapping(value = "/")
public class SimpleController {
@Autowired
Step step;
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Autowired
private JobRepository jobRepository;
@Autowired
JobLauncher jobLauncher;
@RequestMapping(method = RequestMethod.GET)
public void StartJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
JobParameter param = new JobParameter("ValueBeingSetFromRequestHeader");
HashMap map = new HashMap<String, JobParameter>();
map.put("fileName",param);
SimpleJob job = new SimpleJob("test");
job.addStep(step);
System.out.println(jobRepository);
job.setJobRepository(jobRepository);
System.out.println( jobRepository.getLastJobExecution(job.getName(), new JobParameters(map)));
jobLauncher.run(job, new JobParameters(map));
}
}
而我的作业步骤配置 class 是
public class ProducerBatchJobConfigurer {
private String fileName;
public String getFileName() {
return fileName;
}
@Value("#{jobParameters['fileName']}")
public void setFileName(String fileName) {
this.fileName = fileName;
}
@Bean
public ItemReader reader() {
System.out.println(fileName);
if (fileName == null)
return new DummyReader();
FlatFileItemReader faltFileReader = new FlatFileItemReader();
faltFileReader.setResource(new FileSystemResource(new File(fileName)));
faltFileReader.setResource(new FileSystemResource(new File(fileName)));
DefaultLineMapper<KafkaMessage> lineMapper = new DefaultLineMapper<KafkaMessage>();
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
lineMapper.setFieldSetMapper(new CSVFieldMapper());
faltFileReader.setLineMapper(lineMapper);
return faltFileReader;
}
@Bean
public ItemProcessor processor() {
return new CustomItemProcessor();
}
@Bean
public ItemWriter writer() {
return new KafkaWriter();
}
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory,
ItemReader reader, ItemWriter writer,
ItemProcessor processor) {
System.out.println("ddddddddddddddd");
return stepBuilderFactory.get("step1")
.chunk(10).reader(reader)
.processor(processor).writer(writer).build();
}
@Bean
public StepScope stepScope() {
final StepScope stepScope = new StepScope();
stepScope.setAutoProxy(true);
return stepScope;
}
@Bean
public JobScope jobScope(){
return new JobScope();
}
}
使用@Value("#{jobParameters[fileName]}")
代替@Value("#{jobParameters['fileName']}")
@Bean
public ItemReader reader() {
...
}
应该改为
@Bean
public ItemStreamReader reader() {
...
}
甚至更具体
@Bean
public FlatFileItemReader reader() {
...
}
步骤定义也应更改为
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory,
ItemStreamReader reader, ItemWriter writer,
ItemProcessor processor) {
}
为什么有必要?
如果 itemReader 实现了 ItemStream Interface (or ItemStreamReader) Spring 将同样对待它。
A reader, writer, or processor that is directly wired into the Step
will be registered automatically if it implements ItemStream or a
StepListener interface.
这意味着 spring 将调用打开、更新和关闭方法
类似问题 Spring Batch: org.springframework.batch.item.ReaderNotOpenException: Reader must be open before it can be read
我正在从我的控制器启动一个 spring 批处理作业。我需要通过基于 java 的 configuration.But 从配置的自定义 ItemReader
访问作业参数 fileName
我总是从我的 ItemReader 中获取空文件名。请帮忙。
我这样启动是因为我必须根据用户请求更改 ItemReader 文件。
我的控制器:
@RestController
@RequestMapping(value = "/")
public class SimpleController {
@Autowired
Step step;
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Autowired
private JobRepository jobRepository;
@Autowired
JobLauncher jobLauncher;
@RequestMapping(method = RequestMethod.GET)
public void StartJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
JobParameter param = new JobParameter("ValueBeingSetFromRequestHeader");
HashMap map = new HashMap<String, JobParameter>();
map.put("fileName",param);
SimpleJob job = new SimpleJob("test");
job.addStep(step);
System.out.println(jobRepository);
job.setJobRepository(jobRepository);
System.out.println( jobRepository.getLastJobExecution(job.getName(), new JobParameters(map)));
jobLauncher.run(job, new JobParameters(map));
}
}
而我的作业步骤配置 class 是
public class ProducerBatchJobConfigurer {
private String fileName;
public String getFileName() {
return fileName;
}
@Value("#{jobParameters['fileName']}")
public void setFileName(String fileName) {
this.fileName = fileName;
}
@Bean
public ItemReader reader() {
System.out.println(fileName);
if (fileName == null)
return new DummyReader();
FlatFileItemReader faltFileReader = new FlatFileItemReader();
faltFileReader.setResource(new FileSystemResource(new File(fileName)));
faltFileReader.setResource(new FileSystemResource(new File(fileName)));
DefaultLineMapper<KafkaMessage> lineMapper = new DefaultLineMapper<KafkaMessage>();
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
lineMapper.setFieldSetMapper(new CSVFieldMapper());
faltFileReader.setLineMapper(lineMapper);
return faltFileReader;
}
@Bean
public ItemProcessor processor() {
return new CustomItemProcessor();
}
@Bean
public ItemWriter writer() {
return new KafkaWriter();
}
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory,
ItemReader reader, ItemWriter writer,
ItemProcessor processor) {
System.out.println("ddddddddddddddd");
return stepBuilderFactory.get("step1")
.chunk(10).reader(reader)
.processor(processor).writer(writer).build();
}
@Bean
public StepScope stepScope() {
final StepScope stepScope = new StepScope();
stepScope.setAutoProxy(true);
return stepScope;
}
@Bean
public JobScope jobScope(){
return new JobScope();
}
}
使用@Value("#{jobParameters[fileName]}")
代替@Value("#{jobParameters['fileName']}")
@Bean
public ItemReader reader() {
...
}
应该改为
@Bean
public ItemStreamReader reader() {
...
}
甚至更具体
@Bean
public FlatFileItemReader reader() {
...
}
步骤定义也应更改为
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory,
ItemStreamReader reader, ItemWriter writer,
ItemProcessor processor) {
}
为什么有必要?
如果 itemReader 实现了 ItemStream Interface (or ItemStreamReader) Spring 将同样对待它。
A reader, writer, or processor that is directly wired into the Step will be registered automatically if it implements ItemStream or a StepListener interface.
这意味着 spring 将调用打开、更新和关闭方法
类似问题 Spring Batch: org.springframework.batch.item.ReaderNotOpenException: Reader must be open before it can be read