多线程中的错误处理 Spring 批处理

Error handling in multithreaded Spring Batch

我开发了一个 Spring 批处理应用程序,它可以在单线程下正常工作。它是一个简单的批处理应用程序,使用 FlatFileItemReader 读取 csv 文件输出一个 POJO CSVLineMapper,进行简单处理,然后将 POJO 写入存储库。

现在我使用 ThreadPoolTask​​Executor 使应用程序成为多线程。为了测试框架的错误处理,我为处理器中的特定记录抛出 RuntimeException,只期望终止特定线程并仅跳过引发错误的块。 但应用程序在仅写入 15 条记录的错误后终止。为什么?我做错了什么吗?

由于多线程不支持可重启性,我们如何设计多线程 spring 批处理应用程序,以便只跳过有问题的记录,应用程序继续处理其他记录而不终止.

请找到下面使用的代码片段:

public Step load(){
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setMaxPoolSize(5);
threadPoolTaskExecutor.setCorePoolSize(5);
threadPoolTaskExecutor.afterPropertiesSet();
stepBuilderFactory.get("load")
        .chunk(5)
        .reader(reader1())
        .processor(processor())
        .writer(writer())
        .taskExecutor(threadPoolTaskExecutor)
        .listener(stepExecutionListener)
        .listener(processListener())
        .listener(writeListener)
        .build()
}

reader1 是将 setSaveState 设置为 false 的 FlatFileItemReader。

另一个观察是 reader 中的日志在称为我的主线程的完整流程中只被调用一次。但是处理器和写入器是由 ThreadPoolTask​​Executor 的 diff 线程调用的。为什么? reader 没有实现 ItemReader,但处理器和写入器在我的例子中分别实现了 ItemProcessor 和 ItemWriter。

我写了一个 mimimal poc 来重现这个问题。发生的事情是,应用程序在 15-16 条记录后终止。它从不将其余记录写入 csv。 Person 和 Person1 是名字、姓氏、年龄作为字段的 pojo。

Person.csv 有名字、姓氏、年龄列。当超过 20 条记录时,问题就来了。有一条记录,假设第 8 条记录的名字为“zxcv”(有问题的记录)。

reader 中的 sysout 将当前线程打印为主线程。但是 reader 应该是 ThreadPoolExecutor 中线程的一部分?为什么应用程序在没有完成所有记录的情况下终止?

请在下面找到代码:

@SpringBootApplication
@EnableBatchProcessing
public class DemoApplication implements ApplicationRunner {

    private JobLauncher jobLauncher;

    private static ConfigurableApplicationContext ctx;
    static DemoApplication _this;

    public static void main(String...args) {
            try {
                ctx = SpringApplication.run(DemoApplication.class, args);
                _this.start(args);
                System.exit(0);
            }catch(Exception e){
                System.exit(-1);
            }
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        _this=this;
    }

    public void start(String...args) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("user", args[0])
                .addString("repository", args[1])
                .toJobParameters();
        this.jobLauncher.run((Job) ctx.getBean("uploadJob"),jobParameters);
    }
}




@Configuration
public class BulkConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    @StepScope
    public FlatFileItemReader<Person> reader1(){
        FlatFileItemReader<Person>  reader = new FlatFileItemReader<>();
        reader.setLinesToSkip(1);
        reader.setResource(new PathResource("C:\sush\setups\demo\src\main\resources\person.csv"));

        DefaultLineMapper<Person> lineMapper = new DefaultLineMapper<>();
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();

        String[] names = {"firstname", "lastname", "age"};
        tokenizer.setNames(names);

        BeanWrapperFieldSetMapper<Person>  fieldSetMapper = new BeanWrapperFieldSetMapper<>();
        fieldSetMapper.setTargetType(Person.class);

        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper(fieldSetMapper);
        lineMapper.afterPropertiesSet();

        reader.setLineMapper(lineMapper);
        reader.setSaveState(false);
        System.out.println("Thread running in reader "+ Thread.currentThread().getName());
        return reader;

    }

    @Bean
    @StepScope
    public ItemProcessor<Person,Person1> processor1(){
        return new EnrichmentProcessor();
    }

    @Bean
    @StepScope
    public ItemWriter<Person1> writer(){
        return new FileUploader();
    }


    @Bean
    public Step load(){
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setMaxPoolSize(3);
        threadPoolTaskExecutor.setCorePoolSize(3);
        threadPoolTaskExecutor.afterPropertiesSet();

        return this.stepBuilderFactory.get("load")
                .<Person,Person1>chunk(2)
                .reader(reader1())
                .processor(processor1())
                .writer(writer())
                .taskExecutor(threadPoolTaskExecutor)
                .build();
    }


    @Bean(name = "uploadJob")
    public Job cmUploadJob(){
        return this.jobBuilderFactory.get("uploadJob")
                .start(load())
                .build();
    }
}



public class EnrichmentProcessor implements ItemProcessor<Person, Person1> {
    @Override
    public Person1 process(Person person) throws Exception {
        Person1 person1 = new Person1();
        person1.setFirstname(person.getFirstname().toUpperCase());
        person1.setLastname(person.getLastname().toUpperCase());
        person1.setAge(person.getAge());

        if(person.getFirstname().equals("zxcv")){
            throw new RuntimeException("Error occurred in thread " + Thread.currentThread().getName());
        }

        return person1;
    }
}


public class FileUploader implements ItemWriter<Person1> {
    @Override
    public void write(List<? extends Person1> list) throws Exception {
        for(Person1 p : list){
            System.out.println("Writing person "+ p.getFirstname() + Thread.currentThread().getName());
        }
    }
}

The sysout in the reader prints the current thread as main thread. But the reader should be part of a thread in ThreadPoolExecutor?

因为您在应用程序上下文中定义 Spring bean 的方法中添加了 System.out.println。 Spring 应用程序上下文由 main 线程配置。那时,您的工作尚未开始并且 reader 尚未被调用。在多线程步骤中,reader 也应该被任务执行器的线程之一调用,您可以通过在 read 方法中添加一些日志记录来查看,例如:

@Bean
@StepScope
public FlatFileItemReader<Person> reader1() {
    FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>() {
        @Override
        public Person read() throws Exception {
            System.out.println("Thread running when invoking the reader: " + Thread.currentThread().getName());
            return super.read();
        }
    };
    reader.setLinesToSkip(1);
    reader.setResource(new FileSystemResource("persons.csv"));

    DefaultLineMapper<Person> lineMapper = new DefaultLineMapper<>();
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();

    String[] names = {"firstname", "lastname", "age"};
    tokenizer.setNames(names);

    BeanWrapperFieldSetMapper<Person> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
    fieldSetMapper.setTargetType(Person.class);

    lineMapper.setLineTokenizer(tokenizer);
    lineMapper.setFieldSetMapper(fieldSetMapper);
    lineMapper.afterPropertiesSet();

    reader.setLineMapper(lineMapper);
    reader.setSaveState(false);
    System.out.println("Thread running when configuring the reader bean: " + Thread.currentThread().getName());
    return reader;

}

对于你的例子,这会打印:

Thread running when configuring the reader bean: main
Thread running when invoking the reader: ThreadPoolTaskExecutor-3
Thread running when invoking the reader: ThreadPoolTaskExecutor-2
Thread running when invoking the reader: ThreadPoolTaskExecutor-1
Thread running when invoking the reader: ThreadPoolTaskExecutor-2
Thread running when invoking the reader: ThreadPoolTaskExecutor-3
Thread running when invoking the reader: ThreadPoolTaskExecutor-1

Why the application is terminating without completing all the records?

因为你的步骤不是容错步骤。一个简单的面向块的步骤在第一个错误时失败。您可以根据需要将 retry/skip 逻辑配置为容错步骤,例如:

Step step = this.stepBuilderFactory.get("load")
                .<Person, Person1>chunk(2)
                .reader(reader1())
                .processor(processor1())
                .writer(writer())
                .faultTolerant()
                .skipPolicy(new YourSkipPoilcy())
                .retryPolicy(new YourRetryPolicy)
                .taskExecutor(threadPoolTaskExecutor)
                .build();

请参考reference documentation

中的“配置跳过逻辑”和“配置重试逻辑”部分