Spring 批量分区在 itemReader 中注入 stepExecutionContext 参数
Spring Batch Partitioning inject stepExecutionContext parameter in itemReader
我正在尝试学习 Spring 使用 Partitioner 进行批处理。
问题是我需要从分区程序实现中动态设置文件名。我正试图在 itemReader
中获取它。但它给出文件名 null。
我的Spring批量配置:
@Bean
@StepScope
public ItemReader<Transaction> itemReader(@Value("#{stepExecutionContext[filename]}") String filename)
throws UnexpectedInputException, ParseException {
FlatFileItemReader<Transaction> reader = new FlatFileItemReader<Transaction>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
String[] tokens = { "username", "userid", "transactiondate", "amount" };
tokenizer.setNames(tokens);
reader.setResource(new ClassPathResource(
"input/"+filename));
DefaultLineMapper<Transaction> lineMapper = new DefaultLineMapper<Transaction>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
reader.setLinesToSkip(1);
reader.setLineMapper(lineMapper);
return reader;
}
@Bean(name = "partitioningJob")
public Job partitioningJob() throws UnexpectedInputException, MalformedURLException, ParseException {
return jobs.get("partitioningJob").listener(jobListener()).start(partitionStep()).build();
}
@Bean
public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException {
return steps.get("partitionStep").partitioner(step2()).partitioner("step2", partitioner()).gridSize(2).taskExecutor(taskExecutor).build();
}
@Bean
public Step step2() throws UnexpectedInputException, MalformedURLException, ParseException {
return steps.get("step2").<Transaction, Transaction> chunk(1).reader(itemReader(null)).processor(itemProcessor()).writer(itemWriter(marshaller(),null)).build();
}
@Bean
public TransactionPartitioner partitioner() {
TransactionPartitioner partitioner = new TransactionPartitioner();
return partitioner;
}
@Bean
public JobListener jobListener() {
return new JobListener();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(2);
taskExecutor.setQueueCapacity(2);
taskExecutor.setCorePoolSize(2);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
而我的 TransactionPartitioner
class 是:
public class TransactionPartitioner implements Partitioner {
public Map<String, ExecutionContext> partition(int range) {
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
for (int i = 1; i <= range; i++) {
ExecutionContext exContext = new ExecutionContext();
exContext.put("filename", "input"+i+".csv");
exContext.put("name", "Thread" + i);
result.put("partition" + i, exContext);
}
return result;
}
}
这不是正确的做法吗?请提出建议。
这是堆栈跟踪:
18:23:39.060 [main] DEBUG org.springframework.batch.core.job.AbstractJob - Upgrading JobExecution status: StepExecution: id=1, version=2, name=partitionStep, status=FAILED, exitStatus=FAILED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=0, rollbackCount=0, exitDescription=org.springframework.batch.core.JobExecutionException: Partition handler returned an unsuccessful step
at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:112)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:392)
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:306)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:135)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:128)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
at com.sun.proxy.$Proxy19.run(Unknown Source)
at org.baeldung.spring_batch_intro.App.main(App.java:24)
; org.springframework.batch.item.ItemStreamException: Failed to initialize the reader
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:147)
at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:96)
at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:310)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:197)
at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler.call(TaskExecutorPartitionHandler.java:139)
at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler.call(TaskExecutorPartitionHandler.java:136)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Input resource must exist (reader is in 'strict' mode): class path resource [input/null]
at org.springframework.batch.item.file.FlatFileItemReader.doOpen(FlatFileItemReader.java:251)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:144)
... 9 more
根据@Sabir 的建议,我检查了我的数据。步骤上下文 table 如下所示:
| STEP_EXECUTION_ID | SHORT_CONTEXT | SERIALIZED_CONTEXT |
| 1 | {"map":[{"entry":[{"string":"SimpleStepExecutionSplitter.GRID_SIZE","long":2},{"string":["batch.stepType","org.springframework.batch.core.partition.support.PartitionStep"]}]}]} | NULL
| 2 | {"map":[{"entry":[{"string":["filename","input2.csv"]},{"string":["name","Thread2"]}]}]} | NULL |
| 3 | {"map":[{"entry":[{"string":["filename","input1.csv"]},{"string":["name","Thread1"]}]}]}
这是它的完整代码: https://drive.google.com/file/d/0Bziay9b2ceLbUXdTRnZoSjRfR2s/view?usp=sharing
调用 partition
方法不是应用程序代码的责任,就像您在下面所做的那样,
@Bean
public TransactionPartitioner partitioner() {
TransactionPartitioner partitioner = new TransactionPartitioner();
partitioner.partition(10);
return partitioner;
}
框架将为您调用partition
方法。您只需要 return Partitioner
而无需显式调用 partition(10)
方法。
话虽如此,您需要在分区程序步骤中设置分区程序 gridSize
,如下所示,
@Bean
public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException {
return steps.get("partitionStep").partitioner(step2()).partitioner("step2", partitioner()).gridSize(10).taskExecutor(taskExecutor).build();
}
以上几点可能是您遇到问题的根本原因。您的代码的其余部分似乎没问题。
检查了您的代码并尝试 运行 它。
目前它没有在范围级别绑定文件名。
您有两个配置文件:
- SpringConfig - 包含 Spring 相关配置 bean
- SpringBatchConfig - 包含 Spring 批处理相关 bean
第一个包含注释 @EnableBatchProcessing
和 @Configuration
。
但是 itemReader
是在另一个不包含任何注释的配置文件中定义的。
你应该在另一个文件上也有 @Configuration
。
或
您可以将这两个注释都添加到 SpringBatchConfig
配置文件中,并且可以在 Spring
中跳过它们
如果没有这个,这些配置将无法正确读取,并且 itemReader
不会被视为步进范围(即注释 @StepScope
不起作用)并且不会在步进级别绑定值,因此您将获得 NULL
值。
我正在尝试学习 Spring 使用 Partitioner 进行批处理。
问题是我需要从分区程序实现中动态设置文件名。我正试图在 itemReader
中获取它。但它给出文件名 null。
我的Spring批量配置:
@Bean
@StepScope
public ItemReader<Transaction> itemReader(@Value("#{stepExecutionContext[filename]}") String filename)
throws UnexpectedInputException, ParseException {
FlatFileItemReader<Transaction> reader = new FlatFileItemReader<Transaction>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
String[] tokens = { "username", "userid", "transactiondate", "amount" };
tokenizer.setNames(tokens);
reader.setResource(new ClassPathResource(
"input/"+filename));
DefaultLineMapper<Transaction> lineMapper = new DefaultLineMapper<Transaction>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
reader.setLinesToSkip(1);
reader.setLineMapper(lineMapper);
return reader;
}
@Bean(name = "partitioningJob")
public Job partitioningJob() throws UnexpectedInputException, MalformedURLException, ParseException {
return jobs.get("partitioningJob").listener(jobListener()).start(partitionStep()).build();
}
@Bean
public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException {
return steps.get("partitionStep").partitioner(step2()).partitioner("step2", partitioner()).gridSize(2).taskExecutor(taskExecutor).build();
}
@Bean
public Step step2() throws UnexpectedInputException, MalformedURLException, ParseException {
return steps.get("step2").<Transaction, Transaction> chunk(1).reader(itemReader(null)).processor(itemProcessor()).writer(itemWriter(marshaller(),null)).build();
}
@Bean
public TransactionPartitioner partitioner() {
TransactionPartitioner partitioner = new TransactionPartitioner();
return partitioner;
}
@Bean
public JobListener jobListener() {
return new JobListener();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(2);
taskExecutor.setQueueCapacity(2);
taskExecutor.setCorePoolSize(2);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
而我的 TransactionPartitioner
class 是:
public class TransactionPartitioner implements Partitioner {
public Map<String, ExecutionContext> partition(int range) {
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
for (int i = 1; i <= range; i++) {
ExecutionContext exContext = new ExecutionContext();
exContext.put("filename", "input"+i+".csv");
exContext.put("name", "Thread" + i);
result.put("partition" + i, exContext);
}
return result;
}
}
这不是正确的做法吗?请提出建议。
这是堆栈跟踪:
18:23:39.060 [main] DEBUG org.springframework.batch.core.job.AbstractJob - Upgrading JobExecution status: StepExecution: id=1, version=2, name=partitionStep, status=FAILED, exitStatus=FAILED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=0, rollbackCount=0, exitDescription=org.springframework.batch.core.JobExecutionException: Partition handler returned an unsuccessful step
at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:112)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:392)
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:306)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:135)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:128)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
at com.sun.proxy.$Proxy19.run(Unknown Source)
at org.baeldung.spring_batch_intro.App.main(App.java:24)
; org.springframework.batch.item.ItemStreamException: Failed to initialize the reader
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:147)
at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:96)
at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:310)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:197)
at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler.call(TaskExecutorPartitionHandler.java:139)
at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler.call(TaskExecutorPartitionHandler.java:136)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Input resource must exist (reader is in 'strict' mode): class path resource [input/null]
at org.springframework.batch.item.file.FlatFileItemReader.doOpen(FlatFileItemReader.java:251)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:144)
... 9 more
根据@Sabir 的建议,我检查了我的数据。步骤上下文 table 如下所示:
| STEP_EXECUTION_ID | SHORT_CONTEXT | SERIALIZED_CONTEXT |
| 1 | {"map":[{"entry":[{"string":"SimpleStepExecutionSplitter.GRID_SIZE","long":2},{"string":["batch.stepType","org.springframework.batch.core.partition.support.PartitionStep"]}]}]} | NULL
| 2 | {"map":[{"entry":[{"string":["filename","input2.csv"]},{"string":["name","Thread2"]}]}]} | NULL |
| 3 | {"map":[{"entry":[{"string":["filename","input1.csv"]},{"string":["name","Thread1"]}]}]}
这是它的完整代码: https://drive.google.com/file/d/0Bziay9b2ceLbUXdTRnZoSjRfR2s/view?usp=sharing
调用 partition
方法不是应用程序代码的责任,就像您在下面所做的那样,
@Bean
public TransactionPartitioner partitioner() {
TransactionPartitioner partitioner = new TransactionPartitioner();
partitioner.partition(10);
return partitioner;
}
框架将为您调用partition
方法。您只需要 return Partitioner
而无需显式调用 partition(10)
方法。
话虽如此,您需要在分区程序步骤中设置分区程序 gridSize
,如下所示,
@Bean
public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException {
return steps.get("partitionStep").partitioner(step2()).partitioner("step2", partitioner()).gridSize(10).taskExecutor(taskExecutor).build();
}
以上几点可能是您遇到问题的根本原因。您的代码的其余部分似乎没问题。
检查了您的代码并尝试 运行 它。
目前它没有在范围级别绑定文件名。
您有两个配置文件:
- SpringConfig - 包含 Spring 相关配置 bean
- SpringBatchConfig - 包含 Spring 批处理相关 bean
第一个包含注释 @EnableBatchProcessing
和 @Configuration
。
但是 itemReader
是在另一个不包含任何注释的配置文件中定义的。
你应该在另一个文件上也有 @Configuration
。
或
您可以将这两个注释都添加到 SpringBatchConfig
配置文件中,并且可以在 Spring
如果没有这个,这些配置将无法正确读取,并且 itemReader
不会被视为步进范围(即注释 @StepScope
不起作用)并且不会在步进级别绑定值,因此您将获得 NULL
值。