当前线程的范围 'job' 不活动,作业范围 Spring-批处理没有可用的上下文持有者

Scope 'job' is not active for the current thread, No context holder available for job scope Spring-Batch

在我的 Spring 批处理作业中,我尝试使用 JobExecutionContext 在步骤之间共享数据,这只有在我保持步骤单线程时才有效,如下所示:

    @EnableTask
    @EnableBatchProcessing
    @Configuration
    @PropertySource(value = {"classpath:application.properties"})

    public class Config{

    private static final HashMap<String,Object> OVERRIDDEN_BY_EXPRESSION = null;
    private static final String QUERY = "SELECT * FROM \"Config\"";

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    private MongoTemplate mongoTemplate;

    @Autowired
    EntityManager em;


    @Autowired
    DataSource dataSource;


    /*Config Step*/

   @Bean
    public JdbcCursorItemReader<BatchConfig> configReader(DataSource dataSource) {
        JdbcCursorItemReader<BatchConfig> config = new JdbcCursorItemReader<>();
            config.setDataSource(dataSource);

            config.setSql(QUERY);
            config.setRowMapper(new BatchRowMapper());

            return config;
    }

    @Bean
    public ItemWriter<BatchConfig> itemWriter() {
        return new ItemWriter<BatchConfig>() {

            private StepExecution stepExecution;

            @Override
            public void write(List<? extends BatchConfig> items) {
                ExecutionContext stepContext = this.stepExecution.getExecutionContext();
                for (BatchConfig item : items) {
                    HashMap<String, Object> table = new HashMap<>();
                    table.put("date", item.getDate_time());
                    table.put("size", item.getSize());
                    System.out.println(table);
                    stepContext.put(item.getName(), table);

                }
            }

            @BeforeStep
            public void saveStepExecution(StepExecution stepExecution) {
                this.stepExecution = stepExecution;
            }
        };
    }

    @Bean
    public Step stepConfig(JdbcCursorItemReader<BatchConfig> configReader) throws Exception {
        return stepBuilderFactory.get("stepConfig")
                .<BatchConfig, BatchConfig>chunk(10)
                .reader(configReader)
                .writer(itemWriter())
                .listener(promotionListener())
                .build();
    }

    @Bean
    public ExecutionContextPromotionListener promotionListener() {
        ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
        listener.setKeys(new String[] {"COUNTRY", "CATEGORY", "USER"});
        return listener;
    }



    /*Country Step*/

    @JobScope
    @Bean
    public MongoItemReader<COUNTRY> CountryItemReader(@Value("#{jobExecutionContext['COUNTRY']}") HashMap<String, Object> table) {
       int date = (int) table.get("date");
        MongoItemReader<COUNTRY> reader = new MongoItemReader<COUNTRY>();
        reader.setTemplate(mongoTemplate);
        reader.setTargetType(COUNTRY.class);
        reader.setCollection("COUNTRY");
        reader.setFields("{\"COUNTRY_NAME\": 1,\"SHORT_NAME\": 1,\"DEPT_CODE\": 1}");
        reader.setSort(new HashMap<String, Sort.Direction>() {{
            put("_id", Sort.Direction.DESC);
        }});
        reader.setQuery("{DATE_TIME: {$gt:"+date+"}}");
        reader.setPageSize(250);
        return reader;
    }

    @Bean
    public CountryItemProcessor CountryProcessor(){
        return new CountryItemProcessor();
    }

    @Bean
    public JpaItemWriter<COUNTRY> country_writer(){
        JpaItemWriter<COUNTRY> jpa = new JpaItemWriter<COUNTRY>();
        jpa.setEntityManagerFactory(em.getEntityManagerFactory());
        return jpa;
    }


    @JobScope
    @Bean
    public Step step1(@Value("#{jobExecutionContext['COUNTRY']}") HashMap<String, Object> tab) {
        int size = (int) tab.get("size");
        //System.out.println(size);
        return stepBuilderFactory.get("step1")
                .<COUNTRY, COUNTRY>chunk(20)
                .reader(CountryItemReader(OVERRIDDEN_BY_EXPRESSION))
                .writer(country_writer())
                .build();
    }



    @Bean
    public Job TestJob(Step stepConfig) throws Exception {

        return this.jobBuilderFactory.get("TestJob")
                .incrementer(new RunIdIncrementer())// because a spring config bug, this incrementer is not really useful
                .start(stepConfig)
                .next(step1(OVERRIDDEN_BY_EXPRESSION))
               
                .build();
    }

}

但是添加SimpleAsyncTaskExecutor时出现错误:

    org.springframework.beans.factory.support.ScopeNotActiveException: Error creating bean with name 'scopedTarget.CountryItemReader': Scope 'job' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No context holder available for job scope
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:383) ~[spring-beans-5.3.6.jar:5.3.6]
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208) ~[spring-beans-5.3.6.jar:5.3.6]
    at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:35) ~[spring-aop-5.3.6.jar:5.3.6]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:676) ~[spring-aop-5.3.6.jar:5.3.6]
    at org.springframework.batch.item.data.MongoItemReader$$EnhancerBySpringCGLIB$443e4.read(<generated>) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:99) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:180) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.doInIteration(SimpleChunkProvider.java:126) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:118) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:71) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.3.6.jar:5.3.6]
    at org.springframework.batch.core.step.tasklet.TaskletStep.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: java.lang.IllegalStateException: No context holder available for job scope
    at org.springframework.batch.core.scope.JobScope.getContext(JobScope.java:159) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.batch.core.scope.JobScope.get(JobScope.java:92) ~[spring-batch-core-4.3.2.jar:4.3.2]
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:371) ~[spring-beans-5.3.6.jar:5.3.6]

我尝试像这样解决这个问题: https://github.com/spring-projects/spring-batch/issues/1335,但似乎除了主线程外它只使用了一个线程。

有没有办法在不添加调整代码的情况下解决这个问题?

我计划在 Kubernetes 上使用远程分区扩展作业,这个问题是否会因为作业范围而持续存在?

欢迎任何想法或建议。

I'm trying to share data between steps using JobExecutionContext, which works only if i keep the steps single threaded

依靠执行上下文在多线程步骤之间共享数据是不正确的,因为键将被并发线程覆盖。参考文档明确提到在多线程环境下关闭状态管理:

  • Javadoc: remember to use saveState=false if used in a multi-threaded client
  • Reference doc: it is not recommended to use job-scoped beans in multi-threaded or partitioned steps

就是说,我看不出从多线程步骤到下一步可以共享什么密钥(因为线程是并行执行的),但如果你真的需要这样做,你应该使用另一个类似于定义线程安全的共享 bean 的方法。