Spring 批处理:如何将执行上下文放入我的 ItemListenerSupport class?

Spring Batch: How do I get the execution context into my ItemListenerSupport class?

希望这对某些人来说很简单:我希望我的 Spring 批处理应用程序大量使用 ItemListenerSupport“onError”方法来跟踪我在工作中遇到的所有错误,并在工作结束时将它们全部收集在电子邮件中。但是,这不是在 StepExecution 中的步骤之间传递数据的唯一方法(稍后将提升到 JobExecution 中)吗?如何从 ItemListener 访问 StepExecution?这可能是不可能的,因为我发誓我找不到这样的例子。

一个link的例子,或任何一种解释,将不胜感激!

更新:这是一个 link 要点,包含我当前的完整配置:https://gist.github.com/cnickyd/cbfc6dd39bc2e266a5d2153678b7dc1c

为此您不需要作业范围的 bean。您可以做的是让您的侦听器实现 ItemStream 并使用执行上下文来存储您从“onError”方法中需要的内容。这是一个简单的例子:

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.springframework.batch.core.ItemProcessListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class MyJobConfiguration {

    @Bean
    public ItemProcessor<Integer, Integer> itemProcessor() {
        return item -> {
            if (item % 2 != 0) {
                throw new Exception("no odd numbers here!");
            }
            return item;
        };
    }
    
    @Bean
    public MyItemProcessListener itemListenerSupport() {
        return new MyItemProcessListener();
    }

    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs.get("job")
                .start(steps.get("step")
                        .<Integer, Integer>chunk(5)
                        .reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
                        .processor(itemProcessor())
                        .writer(items -> items.forEach(System.out::println))
                        .faultTolerant()
                        .skip(Exception.class)
                        .skipLimit(10)
                        .listener(itemListenerSupport())
                        .stream(itemListenerSupport())
                        .build())
                .listener(new MyJobListener())
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJobConfiguration.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }
    
    class MyItemProcessListener implements ItemProcessListener<Integer, Integer>, ItemStream {

        private ExecutionContext executionContext;

        @Override
        public void open(ExecutionContext executionContext) throws ItemStreamException {
            this.executionContext = executionContext;
            this.executionContext.put("errorItems", new ArrayList<Integer>());
        }

        @Override
        public void update(ExecutionContext executionContext) throws ItemStreamException {
        }

        @Override
        public void close() throws ItemStreamException {
        }

        @Override
        public void beforeProcess(Integer item) {
            
        }

        @Override
        public void afterProcess(Integer item, Integer result) {

        }

        @Override
        @SuppressWarnings("unchecked")
        public void onProcessError(Integer item, Exception e) {
            List<Integer> errorItems = (List<Integer>) executionContext.get("errorItems");
            errorItems.add(item);
            executionContext.put("errorItems", errorItems);
        }
    }
    
    class MyJobListener extends JobExecutionListenerSupport {
        @Override
        public void afterJob(JobExecution jobExecution) {
            // we know there is a single step here. But in a real world scenario, you would get the execution context of the step you need, or use an ExecutionContextPromotionListener and promote the key in the job execution context
            ExecutionContext executionContext = jobExecution.getStepExecutions().iterator().next().getExecutionContext();
            System.out.println("Sending email with error items: " + executionContext.get("errorItems"));
        }
    }

}

这会打印:

2
4
6
8
10
Sending email with error items: [1, 3, 5, 7, 9]