如何从 itemProcessor/itemWriter 中的步骤执行中获取 readSkipCount?
how to get readSkipCount from Step Execution in itemProcessor/itemWriter?
我目前正在将 spring 批处理的 csv 文件读取到对象中,我必须在其中保存总行数以及当前文件的 rejected/skipped 行数,并且使用 StepExecutionListener 不起作用,因为我需要在 step 结束之前而不是在 step 之后得到它。有没有一种方法可以将它们保存在 itemProcessor 或 itemWriter 中而无需添加其他步骤?
i need to get it before the step ends and not after the step
你不能得到总行数直到步骤结束(即读取整个文件)。
using StepExecutionListener didn't work
使用步骤执行侦听器是可行的方法。你没有分享你的代码来了解为什么这对你不起作用,但这里有一个简单的例子:
import java.util.Arrays;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class MyJobConfiguration {
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("myJob")
.start(steps.get("myStep")
.<Integer, Integer>chunk(2)
.reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4)))
.processor((ItemProcessor<Integer, Integer>) item -> {
if (item % 2 != 0) {
throw new Exception("No odd numbers here!");
}
return item;
})
.writer(items -> items.forEach(System.out::println))
.faultTolerant()
.skip(Exception.class)
.skipLimit(5)
.listener(new StepExecutionListener() {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("Starting step " + stepExecution.getStepName());
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println("Step "+ stepExecution.getStepName() + " is complete");
System.out.println("read.count = " + stepExecution.getReadCount());
System.out.println("write.count = " + stepExecution.getWriteCount());
System.out.println("skip.count = " + stepExecution.getSkipCount());
return stepExecution.getExitStatus();
}
})
.build())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJobConfiguration.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
}
这会打印:
Starting step myStep
2
4
Step myStep is complete
read.count = 4
write.count = 2
skip.count = 2
我目前正在将 spring 批处理的 csv 文件读取到对象中,我必须在其中保存总行数以及当前文件的 rejected/skipped 行数,并且使用 StepExecutionListener 不起作用,因为我需要在 step 结束之前而不是在 step 之后得到它。有没有一种方法可以将它们保存在 itemProcessor 或 itemWriter 中而无需添加其他步骤?
i need to get it before the step ends and not after the step
你不能得到总行数直到步骤结束(即读取整个文件)。
using StepExecutionListener didn't work
使用步骤执行侦听器是可行的方法。你没有分享你的代码来了解为什么这对你不起作用,但这里有一个简单的例子:
import java.util.Arrays;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class MyJobConfiguration {
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("myJob")
.start(steps.get("myStep")
.<Integer, Integer>chunk(2)
.reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4)))
.processor((ItemProcessor<Integer, Integer>) item -> {
if (item % 2 != 0) {
throw new Exception("No odd numbers here!");
}
return item;
})
.writer(items -> items.forEach(System.out::println))
.faultTolerant()
.skip(Exception.class)
.skipLimit(5)
.listener(new StepExecutionListener() {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("Starting step " + stepExecution.getStepName());
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println("Step "+ stepExecution.getStepName() + " is complete");
System.out.println("read.count = " + stepExecution.getReadCount());
System.out.println("write.count = " + stepExecution.getWriteCount());
System.out.println("skip.count = " + stepExecution.getSkipCount());
return stepExecution.getExitStatus();
}
})
.build())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJobConfiguration.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
}
这会打印:
Starting step myStep
2
4
Step myStep is complete
read.count = 4
write.count = 2
skip.count = 2