将 ThreadPoolTask​​Executor 与 FlatFileItemReader 和 AsyncItemProcessor + Writer 一起使用是线程安全的吗?

Is it thread-safe to use a ThreadPoolTaskExecutor with a FlatFileItemReader and AsyncItemProcessor + Writer?

我的工作从 csv 文件(最多包含大约 65,000 行)读取数据,处理数据,然后写入数据库。以下是我的配置--

    @Bean
    public Job csvFileLoadJob() throws Exception {
        return jobBuilderFactory.get("csvFileLoadJob")
                .preventRestart()
                .start(csvFileLoadStep())
                .build();
    }

    @Bean
    public Step csvFileLoadStep() throws Exception {
        return this.stepBuilderFactory.get("csvFileLoadStep")
                .transactionManager(myTransactionManager)
                .<Map<String, String>, MyPojo>chunk(1000)
                .reader(csvFileReader(null))
                .processor(asyncCsvFileProcessor())
                .writer(asyncTableWriter())
                .faultTolerant()
                .skipPolicy(fileSkipHandler())
                .throttleLimit(5)
                .taskExecutor(csvFileLoadThreadPoolExecutor)
                .build();
    }

reader 将每一行读入地图(通过自定义 FieldSetMapper)。编写器调用 JpaRepository 将转换后的实体 (MyPojo) 写入数据库。

配置是线程安全的吗?

在 Spring 元数据 table 中,我可以看到读取计数与文件的行数相匹配。但是,目标 table 计数——正确插入目标 table 的记录 + 记录到单独 table 的异常——远远超过行数。

是什么导致了这种差异?

我想通了 -- 提交答案以防其他人遇到同样的问题。 事实证明,faultTolerant() 是重复异常日志的幕后黑手。在失败的情况下,回滚块并重试以隔离坏记录。

我在 fileSkipHandler() 中记录自定义异常,因此回滚 + 重试会导致重复的异常日志。我能够通过实施 @OnSkipInProcess/@OnSkipInWrite 方法来处理异常日志记录来解决这个问题。

使用了this个线程作为参考