Spring 批处理:Assemble 一个作业而不是配置它(可扩展作业配置)
Spring batch : Assemble a job rather than configuring it (Extensible job configuration)
背景
我正在设计一个文件读取层,它可以读取分隔文件并将其加载到 List
。我决定使用 Spring Batch,因为它提供了很多可伸缩性选项,我可以根据文件的大小对不同的文件集加以利用。
要求
- 我想设计一个可用于读取任何分隔文件的通用作业 API。
- 应该有一个单独的作业结构用于解析每个分隔文件。例如,如果系统需要读取 5 个文件,则将有 5 个作业(每个文件一个)。 5 个作业彼此不同的唯一方式是它们将使用不同的
FieldSetMapper
、列名称、目录路径和其他缩放参数,例如 commit-interval
和 throttle-limit
。
- 这个API的用户应该不需要配置一个Spring
在系统中引入新文件类型时自行执行批处理作业、步骤、分块、分区等。
- 用户需要做的就是提供作业要使用的
FieldsetMapper
以及commit-interval
、throttle-limit
和每种类型文件所在的目录被放置。
- 每个文件将有一个预定义的目录。每个目录可以包含多个相同类型和格式的文件。
MultiResourcePartioner
将用于查看目录内部。分区数=目录文件数。
我的要求是构建一个 Spring 批处理基础设施,为我提供一个独特的工作,一旦我拥有了构成该工作的零碎部分,我就可以启动它。
我的解决方案:
我创建了一个抽象配置 class,它将通过具体配置 class 扩展(每个要读取的文件将有 1 个具体 class)。
@Configuration
@EnableBatchProcessing
public abstract class AbstractFileLoader<T> {
private static final String FILE_PATTERN = "*.dat";
@Autowired
JobBuilderFactory jobs;
@Autowired
ResourcePatternResolver resourcePatternResolver;
public final Job createJob(Step s1, JobExecutionListener listener) {
return jobs.get(this.getClass().getSimpleName())
.incrementer(new RunIdIncrementer()).listener(listener)
.start(s1).build();
}
public abstract Job loaderJob(Step s1, JobExecutionListener listener);
public abstract FieldSetMapper<T> getFieldSetMapper();
public abstract String getFilesPath();
public abstract String[] getColumnNames();
public abstract int getChunkSize();
public abstract int getThrottleLimit();
@Bean
@StepScope
@Value("#{stepExecutionContext['fileName']}")
public FlatFileItemReader<T> reader(String file) {
FlatFileItemReader<T> reader = new FlatFileItemReader<T>();
String path = file.substring(file.indexOf(":") + 1, file.length());
FileSystemResource resource = new FileSystemResource(path);
reader.setResource(resource);
DefaultLineMapper<T> lineMapper = new DefaultLineMapper<T>();
lineMapper.setFieldSetMapper(getFieldSetMapper());
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(",");
tokenizer.setNames(getColumnNames());
lineMapper.setLineTokenizer(tokenizer);
reader.setLineMapper(lineMapper);
reader.setLinesToSkip(1);
return reader;
}
@Bean
public ItemProcessor<T, T> processor() {
// TODO add transformations here
return null;
}
@Bean
@JobScope
public ListItemWriter<T> writer() {
ListItemWriter<T> writer = new ListItemWriter<T>();
return writer;
}
@Bean
@JobScope
public Step readStep(StepBuilderFactory stepBuilderFactory,
ItemReader<T> reader, ItemWriter<T> writer,
ItemProcessor<T, T> processor, TaskExecutor taskExecutor) {
final Step readerStep = stepBuilderFactory
.get(this.getClass().getSimpleName() + " ReadStep:slave")
.<T, T> chunk(getChunkSize()).reader(reader)
.processor(processor).writer(writer).taskExecutor(taskExecutor)
.throttleLimit(getThrottleLimit()).build();
final Step partitionedStep = stepBuilderFactory
.get(this.getClass().getSimpleName() + " ReadStep:master")
.partitioner(readerStep)
.partitioner(
this.getClass().getSimpleName() + " ReadStep:slave",
partitioner()).taskExecutor(taskExecutor).build();
return partitionedStep;
}
/*
* @Bean public TaskExecutor taskExecutor() { return new
* SimpleAsyncTaskExecutor(); }
*/
@Bean
@JobScope
public Partitioner partitioner() {
MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
Resource[] resources;
try {
resources = resourcePatternResolver.getResources("file:"
+ getFilesPath() + FILE_PATTERN);
} catch (IOException e) {
throw new RuntimeException(
"I/O problems when resolving the input file pattern.", e);
}
partitioner.setResources(resources);
return partitioner;
}
@Bean
@JobScope
public JobExecutionListener listener(ListItemWriter<T> writer) {
return new JobCompletionNotificationListener<T>(writer);
}
/*
* Use this if you want the writer to have job scope (JIRA BATCH-2269). Also
* change the return type of writer to ListItemWriter for this to work.
*/
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor() {
@Override
protected void doExecute(final Runnable task) {
// gets the jobExecution of the configuration thread
final JobExecution jobExecution = JobSynchronizationManager
.getContext().getJobExecution();
super.doExecute(new Runnable() {
public void run() {
JobSynchronizationManager.register(jobExecution);
try {
task.run();
} finally {
JobSynchronizationManager.close();
}
}
});
}
};
}
}
假设我必须阅读发票数据以便进行讨论。因此,我可以扩展上面的 class 来创建 InvoiceLoader
:
@Configuration
public class InvoiceLoader extends AbstractFileLoader<Invoice>{
private class InvoiceFieldSetMapper implements FieldSetMapper<Invoice> {
public Invoice mapFieldSet(FieldSet f) {
Invoice invoice = new Invoice();
invoice.setNo(f.readString("INVOICE_NO");
return e;
}
}
@Override
public FieldSetMapper<Invoice> getFieldSetMapper() {
return new InvoiceFieldSetMapper();
}
@Override
public String getFilesPath() {
return "I:/CK/invoices/partitions/";
}
@Override
public String[] getColumnNames() {
return new String[] { "INVOICE_NO", "DATE"};
}
@Override
@Bean(name="invoiceJob")
public Job loaderJob(Step s1,
JobExecutionListener listener) {
return createJob(s1, listener);
}
@Override
public int getChunkSize() {
return 25254;
}
@Override
public int getThrottleLimit() {
return 8;
}
}
假设我还有一个名为 Inventory
的 class 扩展 AbstractFileLoader.
在应用程序启动时,我可以按如下方式加载这两个注释配置:
AbstractApplicationContext context1 = new AnnotationConfigApplicationContext(InvoiceLoader.class, InventoryLoader.class);
在我的应用程序的其他地方,两个不同的线程可以按如下方式启动作业:
线程 1:
JobLauncher jobLauncher1 = context1.getBean(JobLauncher.class);
Job job1 = context1.getBean("invoiceJob", Job.class);
JobExecution jobExecution = jobLauncher1.run(job1, jobParams1);
线程 2:
JobLauncher jobLauncher1 = context1.getBean(JobLauncher.class);
Job job1 = context1.getBean("inventoryJob", Job.class);
JobExecution jobExecution = jobLauncher1.run(job1, jobParams1);
这种方法的优点是每次有一个新文件要读取时,developer/user 所要做的就是 subclass AbstractFileLoader
并实现所需的抽象方法,无需深入了解如何 assemble 工作的细节。
问题:
- 我是 Spring 批处理的新手,所以我可能忽略了这种方法的一些不太明显的问题,例如 Spring 批处理中的共享内部对象可能会导致两个作业 运行一起失败或明显的问题,例如 bean 的范围。
- 有没有更好的方法来实现我的objective?
@Value("#{stepExecutionContext['fileName']}")
的 fileName
属性总是被赋值为 I:/CK/invoices/partitions/
,这是 InvoiceLoader
中 getPath
方法返回的值,甚至虽然 getPathmethod in
InventoryLoader`returns 是不同的值。
一个选项是将它们作为作业参数传递。例如:
@Bean
Job job() {
jobs.get("myJob").start(step1(null)).build()
}
@Bean
@JobScope
Step step1(@Value('#{jobParameters["commitInterval"]}') commitInterval) {
steps.get('step1')
.chunk((int) commitInterval)
.reader(new IterableItemReader(iterable: [1, 2, 3, 4], name: 'foo'))
.writer(writer(null))
.build()
}
@Bean
@JobScope
ItemWriter writer(@Value('#{jobParameters["writerClass"]}') writerClass) {
applicationContext.classLoader.loadClass(writerClass).newInstance()
}
与MyWriter
:
class MyWriter implements ItemWriter<Integer> {
@Override
void write(List<? extends Integer> items) throws Exception {
println "Write $items"
}
}
然后执行:
def jobExecution = launcher.run(ctx.getBean(Job), new JobParameters([
commitInterval: new JobParameter(3),
writerClass: new JobParameter('MyWriter'), ]))
输出为:
INFO: Executing step: [step1]
Write [1, 2, 3]
Write [4]
Feb 24, 2016 2:30:22 PM org.springframework.batch.core.launch.support.SimpleJobLauncher run
INFO: Job: [SimpleJob: [name=myJob]] completed with the following parameters: [{commitInterval=3, writerClass=MyWriter}] and the following status: [COMPLETED]
Status is: COMPLETED, job execution id 0
#1 step1 COMPLETED
完整示例 here.
背景
我正在设计一个文件读取层,它可以读取分隔文件并将其加载到 List
。我决定使用 Spring Batch,因为它提供了很多可伸缩性选项,我可以根据文件的大小对不同的文件集加以利用。
要求
- 我想设计一个可用于读取任何分隔文件的通用作业 API。
- 应该有一个单独的作业结构用于解析每个分隔文件。例如,如果系统需要读取 5 个文件,则将有 5 个作业(每个文件一个)。 5 个作业彼此不同的唯一方式是它们将使用不同的
FieldSetMapper
、列名称、目录路径和其他缩放参数,例如commit-interval
和throttle-limit
。 - 这个API的用户应该不需要配置一个Spring 在系统中引入新文件类型时自行执行批处理作业、步骤、分块、分区等。
- 用户需要做的就是提供作业要使用的
FieldsetMapper
以及commit-interval
、throttle-limit
和每种类型文件所在的目录被放置。 - 每个文件将有一个预定义的目录。每个目录可以包含多个相同类型和格式的文件。
MultiResourcePartioner
将用于查看目录内部。分区数=目录文件数。
我的要求是构建一个 Spring 批处理基础设施,为我提供一个独特的工作,一旦我拥有了构成该工作的零碎部分,我就可以启动它。
我的解决方案:
我创建了一个抽象配置 class,它将通过具体配置 class 扩展(每个要读取的文件将有 1 个具体 class)。
@Configuration
@EnableBatchProcessing
public abstract class AbstractFileLoader<T> {
private static final String FILE_PATTERN = "*.dat";
@Autowired
JobBuilderFactory jobs;
@Autowired
ResourcePatternResolver resourcePatternResolver;
public final Job createJob(Step s1, JobExecutionListener listener) {
return jobs.get(this.getClass().getSimpleName())
.incrementer(new RunIdIncrementer()).listener(listener)
.start(s1).build();
}
public abstract Job loaderJob(Step s1, JobExecutionListener listener);
public abstract FieldSetMapper<T> getFieldSetMapper();
public abstract String getFilesPath();
public abstract String[] getColumnNames();
public abstract int getChunkSize();
public abstract int getThrottleLimit();
@Bean
@StepScope
@Value("#{stepExecutionContext['fileName']}")
public FlatFileItemReader<T> reader(String file) {
FlatFileItemReader<T> reader = new FlatFileItemReader<T>();
String path = file.substring(file.indexOf(":") + 1, file.length());
FileSystemResource resource = new FileSystemResource(path);
reader.setResource(resource);
DefaultLineMapper<T> lineMapper = new DefaultLineMapper<T>();
lineMapper.setFieldSetMapper(getFieldSetMapper());
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(",");
tokenizer.setNames(getColumnNames());
lineMapper.setLineTokenizer(tokenizer);
reader.setLineMapper(lineMapper);
reader.setLinesToSkip(1);
return reader;
}
@Bean
public ItemProcessor<T, T> processor() {
// TODO add transformations here
return null;
}
@Bean
@JobScope
public ListItemWriter<T> writer() {
ListItemWriter<T> writer = new ListItemWriter<T>();
return writer;
}
@Bean
@JobScope
public Step readStep(StepBuilderFactory stepBuilderFactory,
ItemReader<T> reader, ItemWriter<T> writer,
ItemProcessor<T, T> processor, TaskExecutor taskExecutor) {
final Step readerStep = stepBuilderFactory
.get(this.getClass().getSimpleName() + " ReadStep:slave")
.<T, T> chunk(getChunkSize()).reader(reader)
.processor(processor).writer(writer).taskExecutor(taskExecutor)
.throttleLimit(getThrottleLimit()).build();
final Step partitionedStep = stepBuilderFactory
.get(this.getClass().getSimpleName() + " ReadStep:master")
.partitioner(readerStep)
.partitioner(
this.getClass().getSimpleName() + " ReadStep:slave",
partitioner()).taskExecutor(taskExecutor).build();
return partitionedStep;
}
/*
* @Bean public TaskExecutor taskExecutor() { return new
* SimpleAsyncTaskExecutor(); }
*/
@Bean
@JobScope
public Partitioner partitioner() {
MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
Resource[] resources;
try {
resources = resourcePatternResolver.getResources("file:"
+ getFilesPath() + FILE_PATTERN);
} catch (IOException e) {
throw new RuntimeException(
"I/O problems when resolving the input file pattern.", e);
}
partitioner.setResources(resources);
return partitioner;
}
@Bean
@JobScope
public JobExecutionListener listener(ListItemWriter<T> writer) {
return new JobCompletionNotificationListener<T>(writer);
}
/*
* Use this if you want the writer to have job scope (JIRA BATCH-2269). Also
* change the return type of writer to ListItemWriter for this to work.
*/
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor() {
@Override
protected void doExecute(final Runnable task) {
// gets the jobExecution of the configuration thread
final JobExecution jobExecution = JobSynchronizationManager
.getContext().getJobExecution();
super.doExecute(new Runnable() {
public void run() {
JobSynchronizationManager.register(jobExecution);
try {
task.run();
} finally {
JobSynchronizationManager.close();
}
}
});
}
};
}
}
假设我必须阅读发票数据以便进行讨论。因此,我可以扩展上面的 class 来创建 InvoiceLoader
:
@Configuration
public class InvoiceLoader extends AbstractFileLoader<Invoice>{
private class InvoiceFieldSetMapper implements FieldSetMapper<Invoice> {
public Invoice mapFieldSet(FieldSet f) {
Invoice invoice = new Invoice();
invoice.setNo(f.readString("INVOICE_NO");
return e;
}
}
@Override
public FieldSetMapper<Invoice> getFieldSetMapper() {
return new InvoiceFieldSetMapper();
}
@Override
public String getFilesPath() {
return "I:/CK/invoices/partitions/";
}
@Override
public String[] getColumnNames() {
return new String[] { "INVOICE_NO", "DATE"};
}
@Override
@Bean(name="invoiceJob")
public Job loaderJob(Step s1,
JobExecutionListener listener) {
return createJob(s1, listener);
}
@Override
public int getChunkSize() {
return 25254;
}
@Override
public int getThrottleLimit() {
return 8;
}
}
假设我还有一个名为 Inventory
的 class 扩展 AbstractFileLoader.
在应用程序启动时,我可以按如下方式加载这两个注释配置:
AbstractApplicationContext context1 = new AnnotationConfigApplicationContext(InvoiceLoader.class, InventoryLoader.class);
在我的应用程序的其他地方,两个不同的线程可以按如下方式启动作业:
线程 1:
JobLauncher jobLauncher1 = context1.getBean(JobLauncher.class);
Job job1 = context1.getBean("invoiceJob", Job.class);
JobExecution jobExecution = jobLauncher1.run(job1, jobParams1);
线程 2:
JobLauncher jobLauncher1 = context1.getBean(JobLauncher.class);
Job job1 = context1.getBean("inventoryJob", Job.class);
JobExecution jobExecution = jobLauncher1.run(job1, jobParams1);
这种方法的优点是每次有一个新文件要读取时,developer/user 所要做的就是 subclass AbstractFileLoader
并实现所需的抽象方法,无需深入了解如何 assemble 工作的细节。
问题:
- 我是 Spring 批处理的新手,所以我可能忽略了这种方法的一些不太明显的问题,例如 Spring 批处理中的共享内部对象可能会导致两个作业 运行一起失败或明显的问题,例如 bean 的范围。
- 有没有更好的方法来实现我的objective?
@Value("#{stepExecutionContext['fileName']}")
的fileName
属性总是被赋值为I:/CK/invoices/partitions/
,这是InvoiceLoader
中getPath
方法返回的值,甚至虽然 getPathmethod in
InventoryLoader`returns 是不同的值。
一个选项是将它们作为作业参数传递。例如:
@Bean
Job job() {
jobs.get("myJob").start(step1(null)).build()
}
@Bean
@JobScope
Step step1(@Value('#{jobParameters["commitInterval"]}') commitInterval) {
steps.get('step1')
.chunk((int) commitInterval)
.reader(new IterableItemReader(iterable: [1, 2, 3, 4], name: 'foo'))
.writer(writer(null))
.build()
}
@Bean
@JobScope
ItemWriter writer(@Value('#{jobParameters["writerClass"]}') writerClass) {
applicationContext.classLoader.loadClass(writerClass).newInstance()
}
与MyWriter
:
class MyWriter implements ItemWriter<Integer> {
@Override
void write(List<? extends Integer> items) throws Exception {
println "Write $items"
}
}
然后执行:
def jobExecution = launcher.run(ctx.getBean(Job), new JobParameters([
commitInterval: new JobParameter(3),
writerClass: new JobParameter('MyWriter'), ]))
输出为:
INFO: Executing step: [step1] Write [1, 2, 3] Write [4] Feb 24, 2016 2:30:22 PM org.springframework.batch.core.launch.support.SimpleJobLauncher run INFO: Job: [SimpleJob: [name=myJob]] completed with the following parameters: [{commitInterval=3, writerClass=MyWriter}] and the following status: [COMPLETED] Status is: COMPLETED, job execution id 0 #1 step1 COMPLETED
完整示例 here.