如何在分区的 CompositeItemProcessor 中访问 StepExecution

How to access StepExecution in partitioned CompositeItemProcessor

我有一个案例,我想从 Processor 内部访问 StepExecution,它是 CompositeItemProcessor 的一部分,CompositeItemProcessorpartitioned step 的一部分。

我可以访问 第一个分区 中的 StepExecution,但无法在后续分区中访问它。

这是我的代码:

工作:

@Bean("pendingPaymentAggregatorJob")
    public Job pendingPaymentAggregatorJob(Step populatePendingPaymentAggregatorCache,
                                           Step calculatePaymentRunPeriods,
                                           Step truncatePaymentPendingTables,
                                           Step fetchMaxPaymentId,
                                           Flow pendingPaymentsGenerationFlowForCompleteMonth,
                                           Step populatePendingTablesSecondHalfOfMonth) {
return jobBuilderFactory.get("JOB")
                .start(populatePendingPaymentAggregatorCache)
                .next(calculatePaymentRunPeriods)
                .next(truncatePaymentPendingTables)
                .next(fetchMaxPaymentId)
                .next(currentPeriodFlowDecider())
                .on(FIRST_HALF_OF_MONTH).to(pendingPaymentsGenerationFlowForCompleteMonth)
                .from(currentPeriodFlowDecider())
                .on(SECOND_HALF_OF_MONTH).to(populatePendingTablesSecondHalfOfMonth)
                .end()
                .incrementer(new RunIdIncrementer())
                .listener(jobExecutionListener)
                .build();
}

分区步骤:

var workerStep = stepBuilderFactory.get("NAME-worker")
           .<T, W>chunk(chunkSize)
           .reader(itemReader())
           .writer(itemWriter())
           .processor(compositeItemProcessor)
           .faultTolerant()
           .retryLimit(3)
           .retry(Exception.class)
           .listener(stepExecutionListener)
           .build();

SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setConcurrencyLimit(maxConcurrency);

return stepBuilderFactory.get("NAME")
          .partitioner(workerStep.getName(), partitioner())
          .step(workerStep)
          .gridSize(GRID_SIZE)
          .taskExecutor(simpleAsyncTaskExecutor)
          .build();

复合项处理器:

@Bean
    public CompositeItemProcessor<PendingPayment, PaymentPendingEnvelop> compositeItemProcessor(
            PendingTaxDetailsProcessor pendingTaxDetailsProcessor,
            VatCalculatorProcessor vatCalculatorProcessor) throws Exception {

        CompositeItemProcessor<PendingPayment, PaymentPendingEnvelop> compositeItemProcessor = new CompositeItemProcessor<>();
        compositeItemProcessor.setDelegates(List.of(
                pendingTaxDetailsProcessor,
                vatCalculatorProcessor,
                (ItemProcessor<PendingPaymentsProcessorEnvelope, PaymentPendingEnvelop>) processorEnvelope -> PaymentPendingEnvelop.builder()
                        .paymentPendingTaxDetails(processorEnvelope.getPaymentPendingTaxDetails())
                        .build()
        ));
        compositeItemProcessor.afterPropertiesSet();

        return compositeItemProcessor;
    }

处理器:

@Slf4j
@Configuration
@Component
@StepScope
public class PendingTaxDetailsProcessor implements ItemProcessor<PendingPayment, PendingPaymentsProcessorEnvelope> {

    private StepExecution stepExecution;

    @Autowired
    public PendingTaxDetailsProcessor(@Value("#{stepExecution}") StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }

    @Override
    public PendingPaymentsProcessorEnvelope process(PendingPayment pendingPayment) throws Exception {
        // Code

        int maxPaymentId = stepExecution.getJobExecution().getExecutionContext().getInt("maxPaymentId");
        if (maxPaymentId == 0) {
            throw new InstantiationException("Unable to get PaymentId from ExecutionContext");
        }
        int paymentId = maxPaymentId + 1;
        stepExecution.getJobExecution().getExecutionContext().put("maxPaymentId", paymentId);

        // Code

    }
}

我能够访问遇到此步骤的第一个分区中的上下文,但是当第二个分区尝试执行此操作时,出现异常。 在下面的日志中,可以看到第一个分区读写成功,但是第二个分区抛出异常

2022-01-18 13:56:31.165  INFO 10193 --- [cTaskExecutor-1] c.a.b.l.CommonStepExecutionListener      : Step: POPULATE_PAYMENT_PENDING_TABLES_STEP-Period_1-worker:partition_1_2, Completion time: 2022-01-18T13:56:31.164932, Time taken: 2.153457343 secs, Read count: 1, Write count: 1, Status: COMPLETED
2022-01-18 13:56:33.583  INFO 10193 --- [cTaskExecutor-6] c.a.b.l.CommonStepExecutionListener      : Step: POPULATE_PAYMENT_PENDING_TABLES_STEP-Period_1-worker:partition_2_2 started at: 2022-01-18T13:56:33.583653
2022-01-18 13:56:33.900 ERROR 10193 --- [cTaskExecutor-6] o.s.batch.core.step.AbstractStep         : Encountered an error executing step POPULATE_PAYMENT_PENDING_TABLES_STEP-Period_1-worker in job PENDING_PAYMENTS_AGGREGATOR_JOB

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.pendingTaxDetailsProcessor' defined in file [/Users/user/workspace/billing-paymentrun-service/target/classes/com/awin/billing/steps/pendingpaymentaggregator/processor/PendingTaxDetailsProcessor.class]: Unexpected exception during bean creation; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.batch.core.StepExecution' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:537)
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean(AbstractBeanFactory.java:374)
    at org.springframework.batch.core.scope.StepScope.get(StepScope.java:113)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:371)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208)
    at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:35)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:676)
    at com.awin.billing.steps.pendingpaymentaggregator.processor.PendingTaxDetailsProcessor$$EnhancerBySpringCGLIB$c5e2a24.process(<generated>)
    at org.springframework.batch.item.support.CompositeItemProcessor.processItem(CompositeItemProcessor.java:63)
    at org.springframework.batch.item.support.CompositeItemProcessor.process(CompositeItemProcessor.java:52)
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:134)
    at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.doWithRetry(FaultTolerantChunkProcessor.java:239)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
    at org.springframework.batch.core.step.item.BatchRetryTemplate.execute(BatchRetryTemplate.java:217)
    at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.transform(FaultTolerantChunkProcessor.java:308)
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:210)
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77)
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407)
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
    at org.springframework.batch.core.step.tasklet.TaskletStep.doInChunkContext(TaskletStep.java:273)
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
    at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258)
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208)
    at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler.call(TaskExecutorPartitionHandler.java:138)
    at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler.call(TaskExecutorPartitionHandler.java:135)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at org.springframework.core.task.SimpleAsyncTaskExecutor$ConcurrencyThrottlingRunnable.run(SimpleAsyncTaskExecutor.java:280)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.batch.core.StepExecution' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.raiseNoMatchingBeanFound(DefaultListableBeanFactory.java:1790)
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1346)
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1300)
    at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:887)
    at org.springframework.beans.factory.support.ConstructorResolver.resolvePreparedArguments(ConstructorResolver.java:834)
    at org.springframework.beans.factory.support.ConstructorResolver.autowireConstructor(ConstructorResolver.java:153)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.autowireConstructor(AbstractAutowireCapableBeanFactory.java:1354)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1193)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:564)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:524)
    ... 32 common frames omitted

所以,在尝试了一些不同的方法之后,我成功了。

第一个分区后 StepContext 对象的 Bean 创建步骤失败。但是当我们进行字段注入而不是构造函数注入时,它工作得很好。

所以不是通过构造函数注入 StepContext,

@Autowired
public PendingTaxDetailsProcessor(@Value("#{stepExecution}") StepExecution stepExecution) {
    this.stepExecution = stepExecution;
}

我们可以进行字段注入

@Value("#{stepExecution}")
private StepExecution stepExecution;

我不知道为什么会这样(可能是一个错误),但这对我有用。

这里实际上存在一些问题。

  1. 您需要删除 @Configuration 注释,因为这是一个组件而不是配置 class。
  2. 由于您是通过构造函数注入 StepExecution(它应该可以工作),所以您根本不需要 @BeforeStep 方法。
  3. 看起来您只是 ExecutionContext 中的一个值,仅注入该值可能有意义
@Autowired
public PendingTaxDetailsProcessor(@Value("#{stepExecution['maxPaymentId'}") Integer maxPaymentId) {
    this.maxPaymentId = maxPaymentId;
}
  1. 由于您正在更新检索的值,它可能应该在 StepContext 而不是 JobContext 中,然后通过 StepExecutionListener 提升到 JobContext。
  2. 此外,由于您正在更新值,因此整数值将不起作用,因为它是不可变的。可能想看看将它包装在一个可以更新的对象中,然后您不需要在更新后将值放回 ExecutionContext