Spring 在运行时批量分区
Spring Batch Partitioning At Runtime
我想使用 spring 批处理读取一个大文件。我想分成多个文件并使用分区在不同的线程中处理每个文件。我正在使用以下代码:
@Bean
@StepScope
public MultiResourcePartitioner partitioner() {
MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
partitioner.setKeyName("file");
partitioner.setResources(splitFiles());
return partitioner;
}
private Resource[] splitFiles() {
// Read the large File available in the specified folder
// split the file to smaller files and return them as resource list
}
@Bean
public TaskExecutorPartitionHandler partitionHandler() {
TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler();
partitionHandler.setStep(step1());
partitionHandler.setTaskExecutor(new SimpleAsyncTaskExecutor());
return partitionHandler;
}
@Bean
public Step partitionedMaster() {
return this.stepBuilderFactory.get("step1")
.partitioner(step1().getName(), partitioner(null))
.partitionHandler(partitionHandler())
.build();
}
@Bean
public Job partitionedJob() {
return this.jobBuilderFactory.get("partitionedJob")
.start(partitionedMaster())
.build();
}
@Bean
@StepScope
public FlatFileItemReader<Transaction> fileTransactionReader(@Value("#{stepExecutionContext['file']}") Resource resource) {
return new FlatFileItemReaderBuilder<Transaction>()
.name("flatFileTransactionReader")
.resource(resource)
.fieldSetMapper(fsm)
.build();
}
我的问题是分区程序正在对仅在应用程序启动时的文件夹中可用的文件进行分区。应用程序启动后 运行,如果同一文件夹中有新文件可用,则作业无法读取 them/partition 它们。
我使用了@StepScope,但我仍然遇到问题。
如何在运行时动态读取和分区文件?
Editing it after the first answer:
您好,感谢您的意见。
我可以修改下面的代码以将文件作为参数发送并调用作业,但控件仍然没有进入分区程序方法,因此无法利用分区。
对此有任何意见吗?
public JobParameters getJobParameters() {
Resource[] resources = //getFileToProcessResource
return new JobParametersBuilder()
.addLong(TIME, System.currentTimeMillis())
.addString("inputFiles", resources)
.toJobParameters();
}
JobParameters jobParameters = getJobParameters();
jobLauncher.run(partitionedJob(), jobParameters);
@Bean
@StepScope
public MultiResourcePartitioner partitioner(@Value("#{jobParameters['inputFiles']}") Resource[] resources) {
MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
partitioner.setKeyName("file");
partitioner.setResources(resources);
return partitioner;
}
Once the application is up and running, if a new file is available in the same folder, the job couldn't read them/partition them
批处理是针对固定的数据集。在您的情况下,您开始了一项工作,但其输入数据同时发生了变化,因此这不会像您预期的那样工作。可重启性需要固定数据集,以便在发生故障时处理相同的数据集。
由于您的作业输入是一个文件,您可以使用该文件作为作业参数并配置监视服务(或类似机制)为文件夹中的每个新文件启动一个新的作业实例。
编辑:添加示例以使分区程序了解作业参数
@Bean
@StepScope
public MultiResourcePartitioner partitioner(@Value("#{jobParameters['fileName']}") String fileName) {
MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
partitioner.setKeyName("file");
partitioner.setResources(splitFiles(fileName));
return partitioner;
}
private Resource[] splitFiles(String fileName) {
// Read the large File available in the specified folder
// split the file to smaller files and return them as resource list
return null;
}
我想使用 spring 批处理读取一个大文件。我想分成多个文件并使用分区在不同的线程中处理每个文件。我正在使用以下代码:
@Bean
@StepScope
public MultiResourcePartitioner partitioner() {
MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
partitioner.setKeyName("file");
partitioner.setResources(splitFiles());
return partitioner;
}
private Resource[] splitFiles() {
// Read the large File available in the specified folder
// split the file to smaller files and return them as resource list
}
@Bean
public TaskExecutorPartitionHandler partitionHandler() {
TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler();
partitionHandler.setStep(step1());
partitionHandler.setTaskExecutor(new SimpleAsyncTaskExecutor());
return partitionHandler;
}
@Bean
public Step partitionedMaster() {
return this.stepBuilderFactory.get("step1")
.partitioner(step1().getName(), partitioner(null))
.partitionHandler(partitionHandler())
.build();
}
@Bean
public Job partitionedJob() {
return this.jobBuilderFactory.get("partitionedJob")
.start(partitionedMaster())
.build();
}
@Bean
@StepScope
public FlatFileItemReader<Transaction> fileTransactionReader(@Value("#{stepExecutionContext['file']}") Resource resource) {
return new FlatFileItemReaderBuilder<Transaction>()
.name("flatFileTransactionReader")
.resource(resource)
.fieldSetMapper(fsm)
.build();
}
我的问题是分区程序正在对仅在应用程序启动时的文件夹中可用的文件进行分区。应用程序启动后 运行,如果同一文件夹中有新文件可用,则作业无法读取 them/partition 它们。 我使用了@StepScope,但我仍然遇到问题。
如何在运行时动态读取和分区文件?
Editing it after the first answer:
您好,感谢您的意见。 我可以修改下面的代码以将文件作为参数发送并调用作业,但控件仍然没有进入分区程序方法,因此无法利用分区。 对此有任何意见吗?
public JobParameters getJobParameters() {
Resource[] resources = //getFileToProcessResource
return new JobParametersBuilder()
.addLong(TIME, System.currentTimeMillis())
.addString("inputFiles", resources)
.toJobParameters();
}
JobParameters jobParameters = getJobParameters();
jobLauncher.run(partitionedJob(), jobParameters);
@Bean
@StepScope
public MultiResourcePartitioner partitioner(@Value("#{jobParameters['inputFiles']}") Resource[] resources) {
MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
partitioner.setKeyName("file");
partitioner.setResources(resources);
return partitioner;
}
Once the application is up and running, if a new file is available in the same folder, the job couldn't read them/partition them
批处理是针对固定的数据集。在您的情况下,您开始了一项工作,但其输入数据同时发生了变化,因此这不会像您预期的那样工作。可重启性需要固定数据集,以便在发生故障时处理相同的数据集。
由于您的作业输入是一个文件,您可以使用该文件作为作业参数并配置监视服务(或类似机制)为文件夹中的每个新文件启动一个新的作业实例。
编辑:添加示例以使分区程序了解作业参数
@Bean
@StepScope
public MultiResourcePartitioner partitioner(@Value("#{jobParameters['fileName']}") String fileName) {
MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
partitioner.setKeyName("file");
partitioner.setResources(splitFiles(fileName));
return partitioner;
}
private Resource[] splitFiles(String fileName) {
// Read the large File available in the specified folder
// split the file to smaller files and return them as resource list
return null;
}