多线程步骤在读取平面文件时如何在内部工作?
How multithreaded Steps works internally while reading the flatfile?
我想阅读 10 GB 的平面文件。为此,我选择使用 ThreadPoolTaskExecutor 使我的步骤成为多线程。
我想知道这 4 个工作线程在内部是如何工作的?一个线程如何不读取另一个线程读取的数据。如果有人能解释它在内部是如何工作的,那将是很大的帮助。
@Bean
@StepScope
public FlatFileItemReader<Transaction> fileTransactionReader(@Value("#{jobParameters['inputFlatFile']}") Resource resource) {
return new FlatFileItemReaderBuilder<Transaction>()
.saveState(false)
.resource(resource)
.delimited()
.names(new String[] {"account", "amount", "timestamp"})
.fieldSetMapper(fieldSet -> {
Transaction transaction = new Transaction();
transaction.setAccount(fieldSet.readString("account"));
transaction.setAmount(fieldSet.readBigDecimal("amount"));
transaction.setTimestamp(fieldSet.readDate("timestamp", "yyyy-MM-dd HH:mm:ss"));
return transaction;
})
.build();
}
代码-
@Bean
public Job multithreadedJob() {
return this.jobBuilderFactory.get("multithreadedJob")
.start(step1())
.build();
}
@Bean
public Step step1() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(4);
taskExecutor.setMaxPoolSize(4);
taskExecutor.afterPropertiesSet();
return this.stepBuilderFactory.get("step1")
.<Transaction, Transaction>chunk(100)
.reader(fileTransactionReader(null))
.writer(writer(null))
.taskExecutor(taskExecutor)
.build();
}
FlatFileItemReader
本身不是线程安全的,因为它扩展了 AbstractItemCountingItemStreamItemReader
,其 javadoc 声明 Subclasses are inherently not thread-safe
。所以严格来说,你应该把它包裹在SynchronizedItemStreamReader
中。另见:
话说回来,如果你
- 不关心可重启性,
- 不关心行号,
- 不要使用需要状态的映射,
- 将
saveState
设置为false
,
- 并且不要更改 reader 的默认值
bufferedReaderFactory
、
那么 reader 只是
的薄包装
- a
BufferedReader
其方法 readLine
为每个 FlatFileItemReader::read
、 调用
- 和将每一行映射到目标类型的
LineMapper
并且 BufferedReader
是线程安全的,这使得您的 reader 在多线程步骤中有效地安全调用。
但请注意:Spring 批处理 API 不保证 reader 的线程安全。实际上恰恰相反。因此,多线程行为至少在理论上可以在未来版本中进行更改。此外,上面列出的许多条件有一天可能不再适用于您的实施。因此,真正推荐使用 SynchronizedItemStreamReader
。
另见 Can spring batch multi-threaded step be used safely if number of items in file are very less?
我想阅读 10 GB 的平面文件。为此,我选择使用 ThreadPoolTaskExecutor 使我的步骤成为多线程。
我想知道这 4 个工作线程在内部是如何工作的?一个线程如何不读取另一个线程读取的数据。如果有人能解释它在内部是如何工作的,那将是很大的帮助。
@Bean
@StepScope
public FlatFileItemReader<Transaction> fileTransactionReader(@Value("#{jobParameters['inputFlatFile']}") Resource resource) {
return new FlatFileItemReaderBuilder<Transaction>()
.saveState(false)
.resource(resource)
.delimited()
.names(new String[] {"account", "amount", "timestamp"})
.fieldSetMapper(fieldSet -> {
Transaction transaction = new Transaction();
transaction.setAccount(fieldSet.readString("account"));
transaction.setAmount(fieldSet.readBigDecimal("amount"));
transaction.setTimestamp(fieldSet.readDate("timestamp", "yyyy-MM-dd HH:mm:ss"));
return transaction;
})
.build();
}
代码-
@Bean
public Job multithreadedJob() {
return this.jobBuilderFactory.get("multithreadedJob")
.start(step1())
.build();
}
@Bean
public Step step1() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(4);
taskExecutor.setMaxPoolSize(4);
taskExecutor.afterPropertiesSet();
return this.stepBuilderFactory.get("step1")
.<Transaction, Transaction>chunk(100)
.reader(fileTransactionReader(null))
.writer(writer(null))
.taskExecutor(taskExecutor)
.build();
}
FlatFileItemReader
本身不是线程安全的,因为它扩展了 AbstractItemCountingItemStreamItemReader
,其 javadoc 声明 Subclasses are inherently not thread-safe
。所以严格来说,你应该把它包裹在SynchronizedItemStreamReader
中。另见:
话说回来,如果你
- 不关心可重启性,
- 不关心行号,
- 不要使用需要状态的映射,
- 将
saveState
设置为false
, - 并且不要更改 reader 的默认值
bufferedReaderFactory
、
那么 reader 只是
的薄包装- a
BufferedReader
其方法readLine
为每个FlatFileItemReader::read
、 调用
- 和将每一行映射到目标类型的
LineMapper
并且 BufferedReader
是线程安全的,这使得您的 reader 在多线程步骤中有效地安全调用。
但请注意:Spring 批处理 API 不保证 reader 的线程安全。实际上恰恰相反。因此,多线程行为至少在理论上可以在未来版本中进行更改。此外,上面列出的许多条件有一天可能不再适用于您的实施。因此,真正推荐使用 SynchronizedItemStreamReader
。
另见 Can spring batch multi-threaded step be used safely if number of items in file are very less?