我可以将 FlatfileItemReader 与 Taskexecutor 一起使用吗?
Can I use FlatfileItemReader with Taskexecutor?
我可以在 spring 批处理中将 FlatfileItemReader 与 Taskexecutor 一起使用吗?
我已经用 ThreadPoolTaskExecutor 实现了 FlatFileItemReader。当我在 ItemProcessor 中打印记录时,我没有得到一致的结果,即并非所有记录都被打印,有时其中一条记录被打印多次。这让我了解到 FlatFileItemReader 不是线程安全的,并且在 spring 文档中也有同样的说法,但我看到一些博客说可以将 FlatFileItemReader 与任务执行器一起使用。
所以我的问题是:是否可以将 FlatfileItemReader 与 Task Executor 一起使用?
@Bean
@StepScope
public FlatFileItemReader<DataLifeCycleEvent> csvFileReader(
@Value("#{stepExecution}") StepExecution stepExecution) {
Resource inputResource;
FlatFileItemReader<DataLifeCycleEvent> itemReader = new FlatFileItemReader<>();
itemReader.setLineMapper(new OnboardingLineMapper(stepExecution));
itemReader.setLinesToSkip(1);
itemReader.setSaveState(false);
itemReader.setSkippedLinesCallback(new OnboardingHeaderMapper(stepExecution));
String inputResourceString = stepExecution.getJobParameters().getString("inputResource");
inputResource = new FileSystemResource(inputFileLocation + ApplicationConstant.SLASH + inputResourceString);
itemReader.setResource(inputResource);
stepExecution.getJobExecution().getExecutionContext().putInt(ApplicationConstant.ERROR_COUNT, 0);
return itemReader;
}
FlatFileItemReader
扩展 AbstractItemCountingItemStreamItemReader
即 NOT thread-safe。所以如果在多线程的步骤中使用,需要同步。
您可以将其包装在 SynchronizedItemStreamReader
中。这是一个简单的例子:
@Bean
public SynchronizedItemStreamReader<DataLifeCycleEvent> itemReader() {
FlatFileItemReader<DataLifeCycleEvent> itemReader = ... // your item reader
SynchronizedItemStreamReader<DataLifeCycleEvent> synchronizedItemStreamReader = new SynchronizedItemStreamReader<>();
synchronizedItemStreamReader.setDelegate(itemReader);
return synchronizedItemStreamReader;
}
此方法给出此异常:-java.lang.ClassCastException: com.sun.proxy.$Proxy344 无法转换为 org.springframework.batch.item.support.SynchronizedItemStreamReader
@Bean
public SynchronizedItemStreamReader<DataLifeCycleEvent> itemReader() {
FlatFileItemReader<DataLifeCycleEvent> itemReader = ... // your item reader
SynchronizedItemStreamReader<DataLifeCycleEvent> synchronizedItemStreamReader = new SynchronizedItemStreamReader<>();
synchronizedItemStreamReader.setDelegate(itemReader);
return synchronizedItemStreamReader;
}
我可以在 spring 批处理中将 FlatfileItemReader 与 Taskexecutor 一起使用吗?
我已经用 ThreadPoolTaskExecutor 实现了 FlatFileItemReader。当我在 ItemProcessor 中打印记录时,我没有得到一致的结果,即并非所有记录都被打印,有时其中一条记录被打印多次。这让我了解到 FlatFileItemReader 不是线程安全的,并且在 spring 文档中也有同样的说法,但我看到一些博客说可以将 FlatFileItemReader 与任务执行器一起使用。
所以我的问题是:是否可以将 FlatfileItemReader 与 Task Executor 一起使用?
@Bean
@StepScope
public FlatFileItemReader<DataLifeCycleEvent> csvFileReader(
@Value("#{stepExecution}") StepExecution stepExecution) {
Resource inputResource;
FlatFileItemReader<DataLifeCycleEvent> itemReader = new FlatFileItemReader<>();
itemReader.setLineMapper(new OnboardingLineMapper(stepExecution));
itemReader.setLinesToSkip(1);
itemReader.setSaveState(false);
itemReader.setSkippedLinesCallback(new OnboardingHeaderMapper(stepExecution));
String inputResourceString = stepExecution.getJobParameters().getString("inputResource");
inputResource = new FileSystemResource(inputFileLocation + ApplicationConstant.SLASH + inputResourceString);
itemReader.setResource(inputResource);
stepExecution.getJobExecution().getExecutionContext().putInt(ApplicationConstant.ERROR_COUNT, 0);
return itemReader;
}
FlatFileItemReader
扩展 AbstractItemCountingItemStreamItemReader
即 NOT thread-safe。所以如果在多线程的步骤中使用,需要同步。
您可以将其包装在 SynchronizedItemStreamReader
中。这是一个简单的例子:
@Bean
public SynchronizedItemStreamReader<DataLifeCycleEvent> itemReader() {
FlatFileItemReader<DataLifeCycleEvent> itemReader = ... // your item reader
SynchronizedItemStreamReader<DataLifeCycleEvent> synchronizedItemStreamReader = new SynchronizedItemStreamReader<>();
synchronizedItemStreamReader.setDelegate(itemReader);
return synchronizedItemStreamReader;
}
此方法给出此异常:-java.lang.ClassCastException: com.sun.proxy.$Proxy344 无法转换为 org.springframework.batch.item.support.SynchronizedItemStreamReader
@Bean
public SynchronizedItemStreamReader<DataLifeCycleEvent> itemReader() {
FlatFileItemReader<DataLifeCycleEvent> itemReader = ... // your item reader
SynchronizedItemStreamReader<DataLifeCycleEvent> synchronizedItemStreamReader = new SynchronizedItemStreamReader<>();
synchronizedItemStreamReader.setDelegate(itemReader);
return synchronizedItemStreamReader;
}