在 Spring 批处理中的步骤之间传递值时出现空指针异常
Null Pointer Exception on passing the value between steps in Spring Batch
我正在学习 Spring 批处理,我正在做一个示例程序,我需要在其中将值从一个步骤传递到另一个步骤。
场景:我有一个人table,我从中提取个人详细信息,将几列保存到 DTO(在 ItemWriter对于第 1 步)并将值从 DTO 传递到 where 子句上的不同 table 以从中提取相关值(在第 2 步的 ItemReader 中)。最后,我将生成包含所有这些值的 CSV。
这是我的代码:
@Bean
public Job job() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
return jobBuilderFactory.get("readDBJob").incrementer(new RunIdIncrementer()).start(step1()).next(step2())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1").<Person, Person>chunk(500000).reader(itemReader())
.writer(itemWriter()).listener(promotionListener()).build();
}
@Bean
public Step step2() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
return stepBuilderFactory.get("step2").<Person, Result>chunk(100)
.reader(readingObjectItemReader.cursorReader()).writer(itemWriterForStep2()).build();
}
第 1 步的 ItemWriter:
@Bean
public ItemWriter<Person> itemWriter() {
return new ItemWriter<Person>() {
private StepExecution stepExecution;
List<personDTO> responseList = null;
@Override
public void write(List<? extends Person> items) throws Exception {
for (Person item : items) {
personDTO responseObject = new personDTO();
BeanUtils.copyProperties(item, responseObject);
if(responseObject != null && responseObject.getPersonId() != null) {
if(stepExecution.getExecutionContext().containsKey("personDtoObject")) {
responseList = (List<personDTO>) this.stepExecution.getExecutionContext().get("personDtoObject");
}
responseList.add(responseObject);
this.stepExecution.getExecutionContext().put("personDtoObject", responseList);
}
}
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
this.stepExecution.getExecutionContext().put("personDtoObject", new ArrayList<>());
}
}
作业执行上下文:
@Bean
public Object promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"personDtoObject"});
listener.setStrict(true);
return listener;
}
这是我在第 2 步中尝试访问值的方式 ItemReader
public class ReadingObjectItemReader
实现 ItemReader {
@Autowired
DataSource dataSource;
private List<personDTO> personDtoList;
String value;
@Override
public personDetails read()
throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
return null;
}
@Bean
public JdbcCursorItemReader<personDetails> cursorReader() {
System.out.println("Values from the step 1 " + personDtoList);
....
}
@BeforeStep
public void retrieveSharedData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
personDtoList= (List<personDTO>) jobContext.get("personDtoObject");
}
}
当我在第 2 步中尝试访问 personDtoList 的值时,我得到的是空值。我在第 1 步完成之前验证了 StepContext 中的值,直到那里一切看起来都很好但是当我试图在第 2 步访问它们时我得到空值。
我查看了大部分可用的在线资源,但我无法弄清楚哪里出错了。任何帮助表示赞赏。
提前感谢您的帮助。
在第 1 步的项目编写器中,您正在做的是:
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("personDtoObject", responseList);
这意味着您将覆盖每个块的先前列表。您需要做的是从执行上下文中获取列表并在覆盖键之前在其中添加项目。您还需要在步骤边界(即第一个块和最后一个块)添加一些健全性检查,以确保列表已初始化并且在将其放入执行上下文之前它不是 null
(特别是最后一块)。
编辑:添加促销侦听器工作所需的代码更改
您还需要将 promotionListener()
方法的 return 类型从 Object
更改为 ExecutionContextPromotionListener
:
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"personDtoObject"});
listener.setStrict(true);
return listener;
}
否则此 bean 未正确注册为侦听器。这是一个完整的例子:
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.ExecutionContextPromotionListener;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class MyJob {
private JobBuilderFactory jobBuilderFactory;
private StepBuilderFactory stepBuilderFactory;
public MyJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
}
@Bean
public ItemReader<Integer> itemReader() {
return new ListItemReader<>(Arrays.asList(1, 2, 3, 4));
}
@Bean
public ItemWriter<Integer> itemWriter() {
return new ItemWriter<Integer>() {
private StepExecution stepExecution;
@Override
public void write(List<? extends Integer> items) {
List<Integer> itemsList = (List<Integer>) stepExecution.getExecutionContext().get("items");
for (Integer item : items) {
System.out.println("item = " + item);
itemsList.add(item);
}
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
this.stepExecution.getExecutionContext().put("items", new ArrayList<>());
}
};
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Integer, Integer>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.listener(promotionListener())
.build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.tasklet((contribution, chunkContext) -> {
ExecutionContext executionContext = contribution.getStepExecution().getJobExecution().getExecutionContext();
List<Integer> items = (List<Integer>) executionContext.get("items");
System.out.println("Items read in step1:");
for (Integer item : items) {
System.out.println("item = " + item);
}
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[]{"items"});
listener.setStrict(true);
return listener;
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1())
.next(step2())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
}
打印:
item = 1
item = 2
item = 3
item = 4
Items read in step1:
item = 1
item = 2
item = 3
item = 4
编辑 2: 添加面向块步骤的示例
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.ExecutionContextPromotionListener;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class MyJob {
private JobBuilderFactory jobBuilderFactory;
private StepBuilderFactory stepBuilderFactory;
public MyJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
}
@Bean
public ItemReader<Integer> itemReader() {
return new ListItemReader<>(Arrays.asList(1, 2, 3, 4));
}
@Bean
public ItemWriter<Integer> itemWriter() {
return new ItemWriter<Integer>() {
private StepExecution stepExecution;
@Override
public void write(List<? extends Integer> items) {
List<Integer> itemsList = (List<Integer>) stepExecution.getExecutionContext().get("items");
for (Integer item : items) {
System.out.println("item = " + item);
itemsList.add(item);
}
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
this.stepExecution.getExecutionContext().put("items", new ArrayList<>());
}
};
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Integer, Integer>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.listener(promotionListener())
.build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.<Integer, Integer>chunk(2)
.reader(new ReadingObjectItemReader())
.writer(items -> items.forEach((Consumer<Integer>) integer -> System.out.println("integer = " + integer)))
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[]{"items"});
listener.setStrict(true);
return listener;
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1())
.next(step2())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
public static class ReadingObjectItemReader implements ItemReader<Integer> {
int i = 0;
private List<Integer> items;
@Override
public Integer read() {
if (i >= items.size()) {
return null;
} else {
return items.get(i++);
}
}
@BeforeStep
public void retrieveSharedData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
items = (List<Integer>) jobContext.get("items");
}
}
}
打印:
item = 1
item = 2
item = 3
item = 4
integer = 1
integer = 2
integer = 3
integer = 4
这意味着列表已从步骤 2 中的作业执行上下文中正确检索到,这是一个面向块的步骤。
我正在学习 Spring 批处理,我正在做一个示例程序,我需要在其中将值从一个步骤传递到另一个步骤。
场景:我有一个人table,我从中提取个人详细信息,将几列保存到 DTO(在 ItemWriter对于第 1 步)并将值从 DTO 传递到 where 子句上的不同 table 以从中提取相关值(在第 2 步的 ItemReader 中)。最后,我将生成包含所有这些值的 CSV。
这是我的代码:
@Bean
public Job job() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
return jobBuilderFactory.get("readDBJob").incrementer(new RunIdIncrementer()).start(step1()).next(step2())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1").<Person, Person>chunk(500000).reader(itemReader())
.writer(itemWriter()).listener(promotionListener()).build();
}
@Bean
public Step step2() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
return stepBuilderFactory.get("step2").<Person, Result>chunk(100)
.reader(readingObjectItemReader.cursorReader()).writer(itemWriterForStep2()).build();
}
第 1 步的 ItemWriter:
@Bean
public ItemWriter<Person> itemWriter() {
return new ItemWriter<Person>() {
private StepExecution stepExecution;
List<personDTO> responseList = null;
@Override
public void write(List<? extends Person> items) throws Exception {
for (Person item : items) {
personDTO responseObject = new personDTO();
BeanUtils.copyProperties(item, responseObject);
if(responseObject != null && responseObject.getPersonId() != null) {
if(stepExecution.getExecutionContext().containsKey("personDtoObject")) {
responseList = (List<personDTO>) this.stepExecution.getExecutionContext().get("personDtoObject");
}
responseList.add(responseObject);
this.stepExecution.getExecutionContext().put("personDtoObject", responseList);
}
}
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
this.stepExecution.getExecutionContext().put("personDtoObject", new ArrayList<>());
}
}
作业执行上下文:
@Bean
public Object promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"personDtoObject"});
listener.setStrict(true);
return listener;
}
这是我在第 2 步中尝试访问值的方式 ItemReader
public class ReadingObjectItemReader 实现 ItemReader {
@Autowired
DataSource dataSource;
private List<personDTO> personDtoList;
String value;
@Override
public personDetails read()
throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
return null;
}
@Bean
public JdbcCursorItemReader<personDetails> cursorReader() {
System.out.println("Values from the step 1 " + personDtoList);
....
}
@BeforeStep
public void retrieveSharedData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
personDtoList= (List<personDTO>) jobContext.get("personDtoObject");
}
}
当我在第 2 步中尝试访问 personDtoList 的值时,我得到的是空值。我在第 1 步完成之前验证了 StepContext 中的值,直到那里一切看起来都很好但是当我试图在第 2 步访问它们时我得到空值。
我查看了大部分可用的在线资源,但我无法弄清楚哪里出错了。任何帮助表示赞赏。
提前感谢您的帮助。
在第 1 步的项目编写器中,您正在做的是:
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("personDtoObject", responseList);
这意味着您将覆盖每个块的先前列表。您需要做的是从执行上下文中获取列表并在覆盖键之前在其中添加项目。您还需要在步骤边界(即第一个块和最后一个块)添加一些健全性检查,以确保列表已初始化并且在将其放入执行上下文之前它不是 null
(特别是最后一块)。
编辑:添加促销侦听器工作所需的代码更改
您还需要将 promotionListener()
方法的 return 类型从 Object
更改为 ExecutionContextPromotionListener
:
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"personDtoObject"});
listener.setStrict(true);
return listener;
}
否则此 bean 未正确注册为侦听器。这是一个完整的例子:
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.ExecutionContextPromotionListener;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class MyJob {
private JobBuilderFactory jobBuilderFactory;
private StepBuilderFactory stepBuilderFactory;
public MyJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
}
@Bean
public ItemReader<Integer> itemReader() {
return new ListItemReader<>(Arrays.asList(1, 2, 3, 4));
}
@Bean
public ItemWriter<Integer> itemWriter() {
return new ItemWriter<Integer>() {
private StepExecution stepExecution;
@Override
public void write(List<? extends Integer> items) {
List<Integer> itemsList = (List<Integer>) stepExecution.getExecutionContext().get("items");
for (Integer item : items) {
System.out.println("item = " + item);
itemsList.add(item);
}
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
this.stepExecution.getExecutionContext().put("items", new ArrayList<>());
}
};
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Integer, Integer>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.listener(promotionListener())
.build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.tasklet((contribution, chunkContext) -> {
ExecutionContext executionContext = contribution.getStepExecution().getJobExecution().getExecutionContext();
List<Integer> items = (List<Integer>) executionContext.get("items");
System.out.println("Items read in step1:");
for (Integer item : items) {
System.out.println("item = " + item);
}
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[]{"items"});
listener.setStrict(true);
return listener;
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1())
.next(step2())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
}
打印:
item = 1
item = 2
item = 3
item = 4
Items read in step1:
item = 1
item = 2
item = 3
item = 4
编辑 2: 添加面向块步骤的示例
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.ExecutionContextPromotionListener;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class MyJob {
private JobBuilderFactory jobBuilderFactory;
private StepBuilderFactory stepBuilderFactory;
public MyJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
}
@Bean
public ItemReader<Integer> itemReader() {
return new ListItemReader<>(Arrays.asList(1, 2, 3, 4));
}
@Bean
public ItemWriter<Integer> itemWriter() {
return new ItemWriter<Integer>() {
private StepExecution stepExecution;
@Override
public void write(List<? extends Integer> items) {
List<Integer> itemsList = (List<Integer>) stepExecution.getExecutionContext().get("items");
for (Integer item : items) {
System.out.println("item = " + item);
itemsList.add(item);
}
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
this.stepExecution.getExecutionContext().put("items", new ArrayList<>());
}
};
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Integer, Integer>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.listener(promotionListener())
.build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.<Integer, Integer>chunk(2)
.reader(new ReadingObjectItemReader())
.writer(items -> items.forEach((Consumer<Integer>) integer -> System.out.println("integer = " + integer)))
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[]{"items"});
listener.setStrict(true);
return listener;
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1())
.next(step2())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
public static class ReadingObjectItemReader implements ItemReader<Integer> {
int i = 0;
private List<Integer> items;
@Override
public Integer read() {
if (i >= items.size()) {
return null;
} else {
return items.get(i++);
}
}
@BeforeStep
public void retrieveSharedData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
items = (List<Integer>) jobContext.get("items");
}
}
}
打印:
item = 1
item = 2
item = 3
item = 4
integer = 1
integer = 2
integer = 3
integer = 4
这意味着列表已从步骤 2 中的作业执行上下文中正确检索到,这是一个面向块的步骤。