多线程中的错误处理 Spring 批处理
Error handling in multithreaded Spring Batch
我开发了一个 Spring 批处理应用程序,它可以在单线程下正常工作。它是一个简单的批处理应用程序,使用 FlatFileItemReader 读取 csv 文件输出一个 POJO CSVLineMapper,进行简单处理,然后将 POJO 写入存储库。
现在我使用 ThreadPoolTaskExecutor 使应用程序成为多线程。为了测试框架的错误处理,我为处理器中的特定记录抛出 RuntimeException,只期望终止特定线程并仅跳过引发错误的块。 但应用程序在仅写入 15 条记录的错误后终止。为什么?我做错了什么吗?
由于多线程不支持可重启性,我们如何设计多线程 spring 批处理应用程序,以便只跳过有问题的记录,应用程序继续处理其他记录而不终止.
请找到下面使用的代码片段:
public Step load(){
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setMaxPoolSize(5);
threadPoolTaskExecutor.setCorePoolSize(5);
threadPoolTaskExecutor.afterPropertiesSet();
stepBuilderFactory.get("load")
.chunk(5)
.reader(reader1())
.processor(processor())
.writer(writer())
.taskExecutor(threadPoolTaskExecutor)
.listener(stepExecutionListener)
.listener(processListener())
.listener(writeListener)
.build()
}
reader1 是将 setSaveState 设置为 false 的 FlatFileItemReader。
另一个观察是 reader 中的日志在称为我的主线程的完整流程中只被调用一次。但是处理器和写入器是由 ThreadPoolTaskExecutor 的 diff 线程调用的。为什么? reader 没有实现 ItemReader,但处理器和写入器在我的例子中分别实现了 ItemProcessor 和 ItemWriter。
我写了一个 mimimal poc 来重现这个问题。发生的事情是,应用程序在 15-16 条记录后终止。它从不将其余记录写入 csv。 Person 和 Person1 是名字、姓氏、年龄作为字段的 pojo。
Person.csv 有名字、姓氏、年龄列。当超过 20 条记录时,问题就来了。有一条记录,假设第 8 条记录的名字为“zxcv”(有问题的记录)。
reader 中的 sysout 将当前线程打印为主线程。但是 reader 应该是 ThreadPoolExecutor 中线程的一部分?为什么应用程序在没有完成所有记录的情况下终止?
请在下面找到代码:
@SpringBootApplication
@EnableBatchProcessing
public class DemoApplication implements ApplicationRunner {
private JobLauncher jobLauncher;
private static ConfigurableApplicationContext ctx;
static DemoApplication _this;
public static void main(String...args) {
try {
ctx = SpringApplication.run(DemoApplication.class, args);
_this.start(args);
System.exit(0);
}catch(Exception e){
System.exit(-1);
}
}
@Override
public void run(ApplicationArguments args) throws Exception {
_this=this;
}
public void start(String...args) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
JobParameters jobParameters = new JobParametersBuilder()
.addString("user", args[0])
.addString("repository", args[1])
.toJobParameters();
this.jobLauncher.run((Job) ctx.getBean("uploadJob"),jobParameters);
}
}
@Configuration
public class BulkConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
@StepScope
public FlatFileItemReader<Person> reader1(){
FlatFileItemReader<Person> reader = new FlatFileItemReader<>();
reader.setLinesToSkip(1);
reader.setResource(new PathResource("C:\sush\setups\demo\src\main\resources\person.csv"));
DefaultLineMapper<Person> lineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
String[] names = {"firstname", "lastname", "age"};
tokenizer.setNames(names);
BeanWrapperFieldSetMapper<Person> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
fieldSetMapper.setTargetType(Person.class);
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(fieldSetMapper);
lineMapper.afterPropertiesSet();
reader.setLineMapper(lineMapper);
reader.setSaveState(false);
System.out.println("Thread running in reader "+ Thread.currentThread().getName());
return reader;
}
@Bean
@StepScope
public ItemProcessor<Person,Person1> processor1(){
return new EnrichmentProcessor();
}
@Bean
@StepScope
public ItemWriter<Person1> writer(){
return new FileUploader();
}
@Bean
public Step load(){
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setMaxPoolSize(3);
threadPoolTaskExecutor.setCorePoolSize(3);
threadPoolTaskExecutor.afterPropertiesSet();
return this.stepBuilderFactory.get("load")
.<Person,Person1>chunk(2)
.reader(reader1())
.processor(processor1())
.writer(writer())
.taskExecutor(threadPoolTaskExecutor)
.build();
}
@Bean(name = "uploadJob")
public Job cmUploadJob(){
return this.jobBuilderFactory.get("uploadJob")
.start(load())
.build();
}
}
public class EnrichmentProcessor implements ItemProcessor<Person, Person1> {
@Override
public Person1 process(Person person) throws Exception {
Person1 person1 = new Person1();
person1.setFirstname(person.getFirstname().toUpperCase());
person1.setLastname(person.getLastname().toUpperCase());
person1.setAge(person.getAge());
if(person.getFirstname().equals("zxcv")){
throw new RuntimeException("Error occurred in thread " + Thread.currentThread().getName());
}
return person1;
}
}
public class FileUploader implements ItemWriter<Person1> {
@Override
public void write(List<? extends Person1> list) throws Exception {
for(Person1 p : list){
System.out.println("Writing person "+ p.getFirstname() + Thread.currentThread().getName());
}
}
}
The sysout in the reader prints the current thread as main thread. But the reader should be part of a thread in ThreadPoolExecutor?
因为您在应用程序上下文中定义 Spring bean 的方法中添加了 System.out.println
。 Spring 应用程序上下文由 main
线程配置。那时,您的工作尚未开始并且 reader 尚未被调用。在多线程步骤中,reader 也应该被任务执行器的线程之一调用,您可以通过在 read
方法中添加一些日志记录来查看,例如:
@Bean
@StepScope
public FlatFileItemReader<Person> reader1() {
FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>() {
@Override
public Person read() throws Exception {
System.out.println("Thread running when invoking the reader: " + Thread.currentThread().getName());
return super.read();
}
};
reader.setLinesToSkip(1);
reader.setResource(new FileSystemResource("persons.csv"));
DefaultLineMapper<Person> lineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
String[] names = {"firstname", "lastname", "age"};
tokenizer.setNames(names);
BeanWrapperFieldSetMapper<Person> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
fieldSetMapper.setTargetType(Person.class);
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(fieldSetMapper);
lineMapper.afterPropertiesSet();
reader.setLineMapper(lineMapper);
reader.setSaveState(false);
System.out.println("Thread running when configuring the reader bean: " + Thread.currentThread().getName());
return reader;
}
对于你的例子,这会打印:
Thread running when configuring the reader bean: main
Thread running when invoking the reader: ThreadPoolTaskExecutor-3
Thread running when invoking the reader: ThreadPoolTaskExecutor-2
Thread running when invoking the reader: ThreadPoolTaskExecutor-1
Thread running when invoking the reader: ThreadPoolTaskExecutor-2
Thread running when invoking the reader: ThreadPoolTaskExecutor-3
Thread running when invoking the reader: ThreadPoolTaskExecutor-1
Why the application is terminating without completing all the records?
因为你的步骤不是容错步骤。一个简单的面向块的步骤在第一个错误时失败。您可以根据需要将 retry/skip 逻辑配置为容错步骤,例如:
Step step = this.stepBuilderFactory.get("load")
.<Person, Person1>chunk(2)
.reader(reader1())
.processor(processor1())
.writer(writer())
.faultTolerant()
.skipPolicy(new YourSkipPoilcy())
.retryPolicy(new YourRetryPolicy)
.taskExecutor(threadPoolTaskExecutor)
.build();
中的“配置跳过逻辑”和“配置重试逻辑”部分
我开发了一个 Spring 批处理应用程序,它可以在单线程下正常工作。它是一个简单的批处理应用程序,使用 FlatFileItemReader 读取 csv 文件输出一个 POJO CSVLineMapper,进行简单处理,然后将 POJO 写入存储库。
现在我使用 ThreadPoolTaskExecutor 使应用程序成为多线程。为了测试框架的错误处理,我为处理器中的特定记录抛出 RuntimeException,只期望终止特定线程并仅跳过引发错误的块。 但应用程序在仅写入 15 条记录的错误后终止。为什么?我做错了什么吗?
由于多线程不支持可重启性,我们如何设计多线程 spring 批处理应用程序,以便只跳过有问题的记录,应用程序继续处理其他记录而不终止.
请找到下面使用的代码片段:
public Step load(){
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setMaxPoolSize(5);
threadPoolTaskExecutor.setCorePoolSize(5);
threadPoolTaskExecutor.afterPropertiesSet();
stepBuilderFactory.get("load")
.chunk(5)
.reader(reader1())
.processor(processor())
.writer(writer())
.taskExecutor(threadPoolTaskExecutor)
.listener(stepExecutionListener)
.listener(processListener())
.listener(writeListener)
.build()
}
reader1 是将 setSaveState 设置为 false 的 FlatFileItemReader。
另一个观察是 reader 中的日志在称为我的主线程的完整流程中只被调用一次。但是处理器和写入器是由 ThreadPoolTaskExecutor 的 diff 线程调用的。为什么? reader 没有实现 ItemReader,但处理器和写入器在我的例子中分别实现了 ItemProcessor 和 ItemWriter。
我写了一个 mimimal poc 来重现这个问题。发生的事情是,应用程序在 15-16 条记录后终止。它从不将其余记录写入 csv。 Person 和 Person1 是名字、姓氏、年龄作为字段的 pojo。
Person.csv 有名字、姓氏、年龄列。当超过 20 条记录时,问题就来了。有一条记录,假设第 8 条记录的名字为“zxcv”(有问题的记录)。
reader 中的 sysout 将当前线程打印为主线程。但是 reader 应该是 ThreadPoolExecutor 中线程的一部分?为什么应用程序在没有完成所有记录的情况下终止?
请在下面找到代码:
@SpringBootApplication
@EnableBatchProcessing
public class DemoApplication implements ApplicationRunner {
private JobLauncher jobLauncher;
private static ConfigurableApplicationContext ctx;
static DemoApplication _this;
public static void main(String...args) {
try {
ctx = SpringApplication.run(DemoApplication.class, args);
_this.start(args);
System.exit(0);
}catch(Exception e){
System.exit(-1);
}
}
@Override
public void run(ApplicationArguments args) throws Exception {
_this=this;
}
public void start(String...args) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
JobParameters jobParameters = new JobParametersBuilder()
.addString("user", args[0])
.addString("repository", args[1])
.toJobParameters();
this.jobLauncher.run((Job) ctx.getBean("uploadJob"),jobParameters);
}
}
@Configuration
public class BulkConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
@StepScope
public FlatFileItemReader<Person> reader1(){
FlatFileItemReader<Person> reader = new FlatFileItemReader<>();
reader.setLinesToSkip(1);
reader.setResource(new PathResource("C:\sush\setups\demo\src\main\resources\person.csv"));
DefaultLineMapper<Person> lineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
String[] names = {"firstname", "lastname", "age"};
tokenizer.setNames(names);
BeanWrapperFieldSetMapper<Person> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
fieldSetMapper.setTargetType(Person.class);
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(fieldSetMapper);
lineMapper.afterPropertiesSet();
reader.setLineMapper(lineMapper);
reader.setSaveState(false);
System.out.println("Thread running in reader "+ Thread.currentThread().getName());
return reader;
}
@Bean
@StepScope
public ItemProcessor<Person,Person1> processor1(){
return new EnrichmentProcessor();
}
@Bean
@StepScope
public ItemWriter<Person1> writer(){
return new FileUploader();
}
@Bean
public Step load(){
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setMaxPoolSize(3);
threadPoolTaskExecutor.setCorePoolSize(3);
threadPoolTaskExecutor.afterPropertiesSet();
return this.stepBuilderFactory.get("load")
.<Person,Person1>chunk(2)
.reader(reader1())
.processor(processor1())
.writer(writer())
.taskExecutor(threadPoolTaskExecutor)
.build();
}
@Bean(name = "uploadJob")
public Job cmUploadJob(){
return this.jobBuilderFactory.get("uploadJob")
.start(load())
.build();
}
}
public class EnrichmentProcessor implements ItemProcessor<Person, Person1> {
@Override
public Person1 process(Person person) throws Exception {
Person1 person1 = new Person1();
person1.setFirstname(person.getFirstname().toUpperCase());
person1.setLastname(person.getLastname().toUpperCase());
person1.setAge(person.getAge());
if(person.getFirstname().equals("zxcv")){
throw new RuntimeException("Error occurred in thread " + Thread.currentThread().getName());
}
return person1;
}
}
public class FileUploader implements ItemWriter<Person1> {
@Override
public void write(List<? extends Person1> list) throws Exception {
for(Person1 p : list){
System.out.println("Writing person "+ p.getFirstname() + Thread.currentThread().getName());
}
}
}
The sysout in the reader prints the current thread as main thread. But the reader should be part of a thread in ThreadPoolExecutor?
因为您在应用程序上下文中定义 Spring bean 的方法中添加了 System.out.println
。 Spring 应用程序上下文由 main
线程配置。那时,您的工作尚未开始并且 reader 尚未被调用。在多线程步骤中,reader 也应该被任务执行器的线程之一调用,您可以通过在 read
方法中添加一些日志记录来查看,例如:
@Bean
@StepScope
public FlatFileItemReader<Person> reader1() {
FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>() {
@Override
public Person read() throws Exception {
System.out.println("Thread running when invoking the reader: " + Thread.currentThread().getName());
return super.read();
}
};
reader.setLinesToSkip(1);
reader.setResource(new FileSystemResource("persons.csv"));
DefaultLineMapper<Person> lineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
String[] names = {"firstname", "lastname", "age"};
tokenizer.setNames(names);
BeanWrapperFieldSetMapper<Person> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
fieldSetMapper.setTargetType(Person.class);
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(fieldSetMapper);
lineMapper.afterPropertiesSet();
reader.setLineMapper(lineMapper);
reader.setSaveState(false);
System.out.println("Thread running when configuring the reader bean: " + Thread.currentThread().getName());
return reader;
}
对于你的例子,这会打印:
Thread running when configuring the reader bean: main
Thread running when invoking the reader: ThreadPoolTaskExecutor-3
Thread running when invoking the reader: ThreadPoolTaskExecutor-2
Thread running when invoking the reader: ThreadPoolTaskExecutor-1
Thread running when invoking the reader: ThreadPoolTaskExecutor-2
Thread running when invoking the reader: ThreadPoolTaskExecutor-3
Thread running when invoking the reader: ThreadPoolTaskExecutor-1
Why the application is terminating without completing all the records?
因为你的步骤不是容错步骤。一个简单的面向块的步骤在第一个错误时失败。您可以根据需要将 retry/skip 逻辑配置为容错步骤,例如:
Step step = this.stepBuilderFactory.get("load")
.<Person, Person1>chunk(2)
.reader(reader1())
.processor(processor1())
.writer(writer())
.faultTolerant()
.skipPolicy(new YourSkipPoilcy())
.retryPolicy(new YourRetryPolicy)
.taskExecutor(threadPoolTaskExecutor)
.build();
中的“配置跳过逻辑”和“配置重试逻辑”部分