将 ThreadPoolTaskExecutor 与 FlatFileItemReader 和 AsyncItemProcessor + Writer 一起使用是线程安全的吗?
Is it thread-safe to use a ThreadPoolTaskExecutor with a FlatFileItemReader and AsyncItemProcessor + Writer?
我的工作从 csv 文件(最多包含大约 65,000 行)读取数据,处理数据,然后写入数据库。以下是我的配置--
@Bean
public Job csvFileLoadJob() throws Exception {
return jobBuilderFactory.get("csvFileLoadJob")
.preventRestart()
.start(csvFileLoadStep())
.build();
}
@Bean
public Step csvFileLoadStep() throws Exception {
return this.stepBuilderFactory.get("csvFileLoadStep")
.transactionManager(myTransactionManager)
.<Map<String, String>, MyPojo>chunk(1000)
.reader(csvFileReader(null))
.processor(asyncCsvFileProcessor())
.writer(asyncTableWriter())
.faultTolerant()
.skipPolicy(fileSkipHandler())
.throttleLimit(5)
.taskExecutor(csvFileLoadThreadPoolExecutor)
.build();
}
reader 将每一行读入地图(通过自定义 FieldSetMapper
)。编写器调用 JpaRepository 将转换后的实体 (MyPojo
) 写入数据库。
配置是线程安全的吗?
在 Spring 元数据 table 中,我可以看到读取计数与文件的行数相匹配。但是,目标 table 计数——正确插入目标 table 的记录 + 记录到单独 table 的异常——远远超过行数。
是什么导致了这种差异?
我想通了 -- 提交答案以防其他人遇到同样的问题。
事实证明,faultTolerant()
是重复异常日志的幕后黑手。在失败的情况下,回滚块并重试以隔离坏记录。
我在 fileSkipHandler()
中记录自定义异常,因此回滚 + 重试会导致重复的异常日志。我能够通过实施 @OnSkipInProcess
/@OnSkipInWrite
方法来处理异常日志记录来解决这个问题。
使用了this个线程作为参考
我的工作从 csv 文件(最多包含大约 65,000 行)读取数据,处理数据,然后写入数据库。以下是我的配置--
@Bean
public Job csvFileLoadJob() throws Exception {
return jobBuilderFactory.get("csvFileLoadJob")
.preventRestart()
.start(csvFileLoadStep())
.build();
}
@Bean
public Step csvFileLoadStep() throws Exception {
return this.stepBuilderFactory.get("csvFileLoadStep")
.transactionManager(myTransactionManager)
.<Map<String, String>, MyPojo>chunk(1000)
.reader(csvFileReader(null))
.processor(asyncCsvFileProcessor())
.writer(asyncTableWriter())
.faultTolerant()
.skipPolicy(fileSkipHandler())
.throttleLimit(5)
.taskExecutor(csvFileLoadThreadPoolExecutor)
.build();
}
reader 将每一行读入地图(通过自定义 FieldSetMapper
)。编写器调用 JpaRepository 将转换后的实体 (MyPojo
) 写入数据库。
配置是线程安全的吗?
在 Spring 元数据 table 中,我可以看到读取计数与文件的行数相匹配。但是,目标 table 计数——正确插入目标 table 的记录 + 记录到单独 table 的异常——远远超过行数。
是什么导致了这种差异?
我想通了 -- 提交答案以防其他人遇到同样的问题。
事实证明,faultTolerant()
是重复异常日志的幕后黑手。在失败的情况下,回滚块并重试以隔离坏记录。
我在 fileSkipHandler()
中记录自定义异常,因此回滚 + 重试会导致重复的异常日志。我能够通过实施 @OnSkipInProcess
/@OnSkipInWrite
方法来处理异常日志记录来解决这个问题。
使用了this个线程作为参考