如何从 spring 引导 Web 应用程序中的 Spring 批处理作业的 ItemReader class 访问作业参数

How to access jobparameter from ItemReader class of Spring batch job in spring boot web application

我正在从我的控制器启动一个 spring 批处理作业。我需要通过基于 java 的 configuration.But 从配置的自定义 ItemReader 访问作业参数 fileName 我总是从我的 ItemReader 中获取空文件名。请帮忙。 我这样启动是因为我必须根据用户请求更改 ItemReader 文件。

我的控制器:

@RestController
@RequestMapping(value = "/")
public class SimpleController {


@Autowired
Step step;

@Autowired
private JobBuilderFactory jobs;

@Autowired
private StepBuilderFactory steps;

@Autowired
private JobRepository jobRepository;

@Autowired
JobLauncher jobLauncher;

@RequestMapping(method = RequestMethod.GET)
public void StartJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
    JobParameter param = new JobParameter("ValueBeingSetFromRequestHeader");
    HashMap map = new HashMap<String, JobParameter>();
    map.put("fileName",param);
    SimpleJob job = new SimpleJob("test");
    job.addStep(step);
    System.out.println(jobRepository);
    job.setJobRepository(jobRepository);

    System.out.println( jobRepository.getLastJobExecution(job.getName(), new JobParameters(map)));

    jobLauncher.run(job, new JobParameters(map));

}

}

而我的作业步骤配置 class 是

public class ProducerBatchJobConfigurer {


private String fileName;

public String getFileName() {
    return fileName;
}
@Value("#{jobParameters['fileName']}")
public void setFileName(String fileName) {
    this.fileName = fileName;
}

@Bean
public ItemReader reader() {
    System.out.println(fileName);
    if (fileName == null)
        return new DummyReader();
    FlatFileItemReader faltFileReader =  new FlatFileItemReader();

    faltFileReader.setResource(new FileSystemResource(new File(fileName)));
    faltFileReader.setResource(new FileSystemResource(new File(fileName)));
    DefaultLineMapper<KafkaMessage> lineMapper = new DefaultLineMapper<KafkaMessage>();
    lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
    lineMapper.setFieldSetMapper(new CSVFieldMapper());
    faltFileReader.setLineMapper(lineMapper);
    return faltFileReader;
}

@Bean
public ItemProcessor processor() {
   return new CustomItemProcessor();

}

@Bean
public ItemWriter writer() {
    return new KafkaWriter();
}



@Bean
public Step step1(StepBuilderFactory stepBuilderFactory,
                  ItemReader reader, ItemWriter writer,
                  ItemProcessor processor) {
    System.out.println("ddddddddddddddd");
    return stepBuilderFactory.get("step1")
            .chunk(10).reader(reader)
            .processor(processor).writer(writer).build();
}
@Bean
public StepScope stepScope() {
    final StepScope stepScope = new StepScope();
    stepScope.setAutoProxy(true);
    return stepScope;
}
@Bean
public JobScope jobScope(){
    return new JobScope();
}
}

使用@Value("#{jobParameters[fileName]}")代替@Value("#{jobParameters['fileName']}")

@Bean
public ItemReader reader() {
 ...
}

应该改为

@Bean
public ItemStreamReader reader() {
 ...
}

甚至更具体

@Bean
public FlatFileItemReader reader() {
 ...
}

步骤定义也应更改为

@Bean
public Step step1(StepBuilderFactory stepBuilderFactory,
                  ItemStreamReader reader, ItemWriter writer,
                  ItemProcessor processor) {

}

为什么有必要?

如果 itemReader 实现了 ItemStream Interface (or ItemStreamReader) Spring 将同样对待它。

https://docs.spring.io/spring-batch/reference/html/readersAndWriters.html#delegatePatternAndRegistering

A reader, writer, or processor that is directly wired into the Step will be registered automatically if it implements ItemStream or a StepListener interface.

这意味着 spring 将调用打开、更新和关闭方法

类似问题 Spring Batch: org.springframework.batch.item.ReaderNotOpenException: Reader must be open before it can be read