Spring 批量分区在 itemReader 中注入 stepExecutionContext 参数

Spring Batch Partitioning inject stepExecutionContext parameter in itemReader

我正在尝试学习 Spring 使用 Partitioner 进行批处理。

问题是我需要从分区程序实现中动态设置文件名。我正试图在 itemReader 中获取它。但它给出文件名 null

我的Spring批量配置:

@Bean
@StepScope
public ItemReader<Transaction> itemReader(@Value("#{stepExecutionContext[filename]}") String filename) 
    throws UnexpectedInputException, ParseException {
    FlatFileItemReader<Transaction> reader = new FlatFileItemReader<Transaction>();
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
    String[] tokens = { "username", "userid", "transactiondate", "amount" };
    tokenizer.setNames(tokens);
    reader.setResource(new ClassPathResource(
        "input/"+filename));
    DefaultLineMapper<Transaction> lineMapper = new DefaultLineMapper<Transaction>();
    lineMapper.setLineTokenizer(tokenizer);
    lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
    reader.setLinesToSkip(1);
    reader.setLineMapper(lineMapper);
    return reader;
}
@Bean(name = "partitioningJob")  
public Job partitioningJob() throws UnexpectedInputException, MalformedURLException, ParseException {  
    return jobs.get("partitioningJob").listener(jobListener()).start(partitionStep()).build();  
}  

@Bean 
public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException {  
    return steps.get("partitionStep").partitioner(step2()).partitioner("step2", partitioner()).gridSize(2).taskExecutor(taskExecutor).build();  
}  

@Bean 
public Step step2() throws UnexpectedInputException, MalformedURLException, ParseException {  
    return steps.get("step2").<Transaction, Transaction> chunk(1).reader(itemReader(null)).processor(itemProcessor()).writer(itemWriter(marshaller(),null)).build();  
}  

@Bean 
public TransactionPartitioner partitioner() {  
    TransactionPartitioner partitioner = new TransactionPartitioner();  
    return partitioner;  
}                           

@Bean 
public JobListener jobListener() {  
   return new JobListener();  
} 

 @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(2);
        taskExecutor.setQueueCapacity(2);
        taskExecutor.setCorePoolSize(2);
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }  

而我的 TransactionPartitioner class 是:

public class TransactionPartitioner implements Partitioner {  

public Map<String, ExecutionContext> partition(int range) {  
    Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();  
    for (int i = 1; i <= range; i++) {  
        ExecutionContext exContext = new ExecutionContext();  
        exContext.put("filename", "input"+i+".csv");
        exContext.put("name", "Thread" + i);  
        result.put("partition" + i, exContext);  
    }       
    return result;  
}  
}

这不是正确的做法吗?请提出建议。

这是堆栈跟踪:

  18:23:39.060 [main] DEBUG org.springframework.batch.core.job.AbstractJob - Upgrading JobExecution status: StepExecution: id=1, version=2, name=partitionStep, status=FAILED, exitStatus=FAILED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=0, rollbackCount=0, exitDescription=org.springframework.batch.core.JobExecutionException: Partition handler returned an unsuccessful step
    at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:112)
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200)
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
    at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:392)
    at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135)
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:306)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:135)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:128)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
    at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
    at com.sun.proxy.$Proxy19.run(Unknown Source)
    at org.baeldung.spring_batch_intro.App.main(App.java:24)
; org.springframework.batch.item.ItemStreamException: Failed to initialize the reader
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:147)
    at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:96)
    at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:310)
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:197)
    at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler.call(TaskExecutorPartitionHandler.java:139)
    at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler.call(TaskExecutorPartitionHandler.java:136)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Input resource must exist (reader is in 'strict' mode): class path resource [input/null]
    at org.springframework.batch.item.file.FlatFileItemReader.doOpen(FlatFileItemReader.java:251)
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:144)
    ... 9 more

根据@Sabir 的建议,我检查了我的数据。步骤上下文 table 如下所示:

  | STEP_EXECUTION_ID | SHORT_CONTEXT | SERIALIZED_CONTEXT |
|                 1 | {"map":[{"entry":[{"string":"SimpleStepExecutionSplitter.GRID_SIZE","long":2},{"string":["batch.stepType","org.springframework.batch.core.partition.support.PartitionStep"]}]}]} | NULL    
|                 2 | {"map":[{"entry":[{"string":["filename","input2.csv"]},{"string":["name","Thread2"]}]}]}                                                                                            | NULL               |
|                 3 | {"map":[{"entry":[{"string":["filename","input1.csv"]},{"string":["name","Thread1"]}]}]}  

这是它的完整代码: https://drive.google.com/file/d/0Bziay9b2ceLbUXdTRnZoSjRfR2s/view?usp=sharing

调用 partition 方法不是应用程序代码的责任,就像您在下面所做的那样,

@Bean 
public TransactionPartitioner partitioner() {  
    TransactionPartitioner partitioner = new TransactionPartitioner();  
    partitioner.partition(10);  
    return partitioner;  
}  

框架将为您调用partition方法。您只需要 return Partitioner 而无需显式调用 partition(10) 方法。

话虽如此,您需要在分区程序步骤中设置分区程序 gridSize,如下所示,

@Bean 
public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException {  
    return steps.get("partitionStep").partitioner(step2()).partitioner("step2", partitioner()).gridSize(10).taskExecutor(taskExecutor).build();  
}  

以上几点可能是您遇到问题的根本原因。您的代码的其余部分似乎没问题。

检查了您的代码并尝试 运行 它。

目前它没有在范围级别绑定文件名。

您有两个配置文件:

  1. SpringConfig - 包含 Spring 相关配置 bean
  2. SpringBatchConfig - 包含 Spring 批处理相关 bean

第一个包含注释 @EnableBatchProcessing@Configuration

但是 itemReader 是在另一个不包含任何注释的配置文件中定义的。

你应该在另一个文件上也有 @Configuration

您可以将这两个注释都添加到 SpringBatchConfig 配置文件中,并且可以在 Spring

中跳过它们

如果没有这个,这些配置将无法正确读取,并且 itemReader 不会被视为步进范围(即注释 @StepScope 不起作用)并且不会在步进级别绑定值,因此您将获得 NULL 值。