多线程步骤在读取平面文件时如何在内部工作?

How multithreaded Steps works internally while reading the flatfile?

我想阅读 10 GB 的平面文件。为此,我选择使用 ThreadPoolTask​​Executor 使我的步骤成为多线程。

我想知道这 4 个工作线程在内部是如何工作的?一个线程如何不读取另一个线程读取的数据。如果有人能解释它在内部是如何工作的,那将是很大的帮助。

@Bean
@StepScope
public FlatFileItemReader<Transaction> fileTransactionReader(@Value("#{jobParameters['inputFlatFile']}") Resource resource) {

        return new FlatFileItemReaderBuilder<Transaction>()
                .saveState(false)
                .resource(resource)
                .delimited()
                .names(new String[] {"account", "amount", "timestamp"})
                .fieldSetMapper(fieldSet -> {
                    Transaction transaction = new Transaction();
                    transaction.setAccount(fieldSet.readString("account"));
                    transaction.setAmount(fieldSet.readBigDecimal("amount"));
                    transaction.setTimestamp(fieldSet.readDate("timestamp", "yyyy-MM-dd HH:mm:ss"));

                    return transaction;
                })
                .build();
    }

代码-

@Bean
public Job multithreadedJob() {
    return this.jobBuilderFactory.get("multithreadedJob")
            .start(step1())
            .build();
}

@Bean
public Step step1() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(4);
    taskExecutor.setMaxPoolSize(4);
    taskExecutor.afterPropertiesSet();

    return this.stepBuilderFactory.get("step1")
            .<Transaction, Transaction>chunk(100)
            .reader(fileTransactionReader(null))
            .writer(writer(null))
            .taskExecutor(taskExecutor)
            .build();
}

FlatFileItemReader 本身不是线程安全的,因为它扩展了 AbstractItemCountingItemStreamItemReader,其 javadoc 声明 Subclasses are inherently not thread-safe。所以严格来说,你应该把它包裹在SynchronizedItemStreamReader中。另见:

话说回来,如果你

  • 不关心可重启性,
  • 不关心行号,
  • 不要使用需要状态的映射,
  • saveState设置为false,
  • 并且不要更改 reader 的默认值 bufferedReaderFactory

那么 reader 只是

的薄包装
  • a BufferedReader 其方法 readLine 为每个 FlatFileItemReader::read
  • 调用
  • 和将每一行映射到目标类型的 LineMapper

并且 BufferedReader 是线程安全的,这使得您的 reader 在多线程步骤中有效地安全调用。

但请注意:Spring 批处理 API 不保证 reader 的线程安全。实际上恰恰相反。因此,多线程行为至少在理论上可以在未来版本中进行更改。此外,上面列出的许多条件有一天可能不再适用于您的实施。因此,真正推荐使用 SynchronizedItemStreamReader

另见 Can spring batch multi-threaded step be used safely if number of items in file are very less?