Spring 批处理:如何将执行上下文放入我的 ItemListenerSupport class?
Spring Batch: How do I get the execution context into my ItemListenerSupport class?
希望这对某些人来说很简单:我希望我的 Spring 批处理应用程序大量使用 ItemListenerSupport
“onError”方法来跟踪我在工作中遇到的所有错误,并在工作结束时将它们全部收集在电子邮件中。但是,这不是在 StepExecution
中的步骤之间传递数据的唯一方法(稍后将提升到 JobExecution
中)吗?如何从 ItemListener
访问 StepExecution
?这可能是不可能的,因为我发誓我找不到这样的例子。
一个link的例子,或任何一种解释,将不胜感激!
更新:这是一个 link 要点,包含我当前的完整配置:https://gist.github.com/cnickyd/cbfc6dd39bc2e266a5d2153678b7dc1c
为此您不需要作业范围的 bean。您可以做的是让您的侦听器实现 ItemStream
并使用执行上下文来存储您从“onError”方法中需要的内容。这是一个简单的例子:
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.springframework.batch.core.ItemProcessListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class MyJobConfiguration {
@Bean
public ItemProcessor<Integer, Integer> itemProcessor() {
return item -> {
if (item % 2 != 0) {
throw new Exception("no odd numbers here!");
}
return item;
};
}
@Bean
public MyItemProcessListener itemListenerSupport() {
return new MyItemProcessListener();
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("job")
.start(steps.get("step")
.<Integer, Integer>chunk(5)
.reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
.processor(itemProcessor())
.writer(items -> items.forEach(System.out::println))
.faultTolerant()
.skip(Exception.class)
.skipLimit(10)
.listener(itemListenerSupport())
.stream(itemListenerSupport())
.build())
.listener(new MyJobListener())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJobConfiguration.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
class MyItemProcessListener implements ItemProcessListener<Integer, Integer>, ItemStream {
private ExecutionContext executionContext;
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
this.executionContext = executionContext;
this.executionContext.put("errorItems", new ArrayList<Integer>());
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
}
@Override
public void close() throws ItemStreamException {
}
@Override
public void beforeProcess(Integer item) {
}
@Override
public void afterProcess(Integer item, Integer result) {
}
@Override
@SuppressWarnings("unchecked")
public void onProcessError(Integer item, Exception e) {
List<Integer> errorItems = (List<Integer>) executionContext.get("errorItems");
errorItems.add(item);
executionContext.put("errorItems", errorItems);
}
}
class MyJobListener extends JobExecutionListenerSupport {
@Override
public void afterJob(JobExecution jobExecution) {
// we know there is a single step here. But in a real world scenario, you would get the execution context of the step you need, or use an ExecutionContextPromotionListener and promote the key in the job execution context
ExecutionContext executionContext = jobExecution.getStepExecutions().iterator().next().getExecutionContext();
System.out.println("Sending email with error items: " + executionContext.get("errorItems"));
}
}
}
这会打印:
2
4
6
8
10
Sending email with error items: [1, 3, 5, 7, 9]
希望这对某些人来说很简单:我希望我的 Spring 批处理应用程序大量使用 ItemListenerSupport
“onError”方法来跟踪我在工作中遇到的所有错误,并在工作结束时将它们全部收集在电子邮件中。但是,这不是在 StepExecution
中的步骤之间传递数据的唯一方法(稍后将提升到 JobExecution
中)吗?如何从 ItemListener
访问 StepExecution
?这可能是不可能的,因为我发誓我找不到这样的例子。
一个link的例子,或任何一种解释,将不胜感激!
更新:这是一个 link 要点,包含我当前的完整配置:https://gist.github.com/cnickyd/cbfc6dd39bc2e266a5d2153678b7dc1c
为此您不需要作业范围的 bean。您可以做的是让您的侦听器实现 ItemStream
并使用执行上下文来存储您从“onError”方法中需要的内容。这是一个简单的例子:
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.springframework.batch.core.ItemProcessListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class MyJobConfiguration {
@Bean
public ItemProcessor<Integer, Integer> itemProcessor() {
return item -> {
if (item % 2 != 0) {
throw new Exception("no odd numbers here!");
}
return item;
};
}
@Bean
public MyItemProcessListener itemListenerSupport() {
return new MyItemProcessListener();
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("job")
.start(steps.get("step")
.<Integer, Integer>chunk(5)
.reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
.processor(itemProcessor())
.writer(items -> items.forEach(System.out::println))
.faultTolerant()
.skip(Exception.class)
.skipLimit(10)
.listener(itemListenerSupport())
.stream(itemListenerSupport())
.build())
.listener(new MyJobListener())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJobConfiguration.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
class MyItemProcessListener implements ItemProcessListener<Integer, Integer>, ItemStream {
private ExecutionContext executionContext;
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
this.executionContext = executionContext;
this.executionContext.put("errorItems", new ArrayList<Integer>());
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
}
@Override
public void close() throws ItemStreamException {
}
@Override
public void beforeProcess(Integer item) {
}
@Override
public void afterProcess(Integer item, Integer result) {
}
@Override
@SuppressWarnings("unchecked")
public void onProcessError(Integer item, Exception e) {
List<Integer> errorItems = (List<Integer>) executionContext.get("errorItems");
errorItems.add(item);
executionContext.put("errorItems", errorItems);
}
}
class MyJobListener extends JobExecutionListenerSupport {
@Override
public void afterJob(JobExecution jobExecution) {
// we know there is a single step here. But in a real world scenario, you would get the execution context of the step you need, or use an ExecutionContextPromotionListener and promote the key in the job execution context
ExecutionContext executionContext = jobExecution.getStepExecutions().iterator().next().getExecutionContext();
System.out.println("Sending email with error items: " + executionContext.get("errorItems"));
}
}
}
这会打印:
2
4
6
8
10
Sending email with error items: [1, 3, 5, 7, 9]