如何从 itemProcessor/itemWriter 中的步骤执行中获取 readSkipCount?

how to get readSkipCount from Step Execution in itemProcessor/itemWriter?

我目前正在将 spring 批处理的 csv 文件读取到对象中,我必须在其中保存总行数以及当前文件的 rejected/skipped 行数,并且使用 StepExecutionListener 不起作用,因为我需要在 step 结束之前而不是在 step 之后得到它。有没有一种方法可以将它们保存在 itemProcessor 或 itemWriter 中而无需添加其他步骤?

i need to get it before the step ends and not after the step

你不能得到总行数直到步骤结束(即读取整个文件)。

using StepExecutionListener didn't work

使用步骤执行侦听器是可行的方法。你没有分享你的代码来了解为什么这对你不起作用,但这里有一个简单的例子:

import java.util.Arrays;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class MyJobConfiguration {

    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs.get("myJob")
                .start(steps.get("myStep")
                        .<Integer, Integer>chunk(2)
                        .reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4)))
                        .processor((ItemProcessor<Integer, Integer>) item -> {
                            if (item % 2 != 0) {
                                throw new Exception("No odd numbers here!");
                            }
                            return item;
                        })
                        .writer(items -> items.forEach(System.out::println))
                        .faultTolerant()
                        .skip(Exception.class)
                        .skipLimit(5)
                        .listener(new StepExecutionListener() {
                            @Override
                            public void beforeStep(StepExecution stepExecution) {
                                System.out.println("Starting step " + stepExecution.getStepName());
                            }

                            @Override
                            public ExitStatus afterStep(StepExecution stepExecution) {
                                System.out.println("Step "+ stepExecution.getStepName() + " is complete");
                                System.out.println("read.count = " + stepExecution.getReadCount());
                                System.out.println("write.count = " + stepExecution.getWriteCount());
                                System.out.println("skip.count = " + stepExecution.getSkipCount());
                                return stepExecution.getExitStatus();
                            }
                        })
                        .build())
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJobConfiguration.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

}

这会打印:

Starting step myStep
2
4
Step myStep is complete
read.count = 4
write.count = 2
skip.count = 2