当前线程的范围 'job' 不活动,作业范围 Spring-批处理没有可用的上下文持有者
Scope 'job' is not active for the current thread, No context holder available for job scope Spring-Batch
在我的 Spring 批处理作业中,我尝试使用 JobExecutionContext 在步骤之间共享数据,这只有在我保持步骤单线程时才有效,如下所示:
@EnableTask
@EnableBatchProcessing
@Configuration
@PropertySource(value = {"classpath:application.properties"})
public class Config{
private static final HashMap<String,Object> OVERRIDDEN_BY_EXPRESSION = null;
private static final String QUERY = "SELECT * FROM \"Config\"";
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
EntityManager em;
@Autowired
DataSource dataSource;
/*Config Step*/
@Bean
public JdbcCursorItemReader<BatchConfig> configReader(DataSource dataSource) {
JdbcCursorItemReader<BatchConfig> config = new JdbcCursorItemReader<>();
config.setDataSource(dataSource);
config.setSql(QUERY);
config.setRowMapper(new BatchRowMapper());
return config;
}
@Bean
public ItemWriter<BatchConfig> itemWriter() {
return new ItemWriter<BatchConfig>() {
private StepExecution stepExecution;
@Override
public void write(List<? extends BatchConfig> items) {
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
for (BatchConfig item : items) {
HashMap<String, Object> table = new HashMap<>();
table.put("date", item.getDate_time());
table.put("size", item.getSize());
System.out.println(table);
stepContext.put(item.getName(), table);
}
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
};
}
@Bean
public Step stepConfig(JdbcCursorItemReader<BatchConfig> configReader) throws Exception {
return stepBuilderFactory.get("stepConfig")
.<BatchConfig, BatchConfig>chunk(10)
.reader(configReader)
.writer(itemWriter())
.listener(promotionListener())
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"COUNTRY", "CATEGORY", "USER"});
return listener;
}
/*Country Step*/
@JobScope
@Bean
public MongoItemReader<COUNTRY> CountryItemReader(@Value("#{jobExecutionContext['COUNTRY']}") HashMap<String, Object> table) {
int date = (int) table.get("date");
MongoItemReader<COUNTRY> reader = new MongoItemReader<COUNTRY>();
reader.setTemplate(mongoTemplate);
reader.setTargetType(COUNTRY.class);
reader.setCollection("COUNTRY");
reader.setFields("{\"COUNTRY_NAME\": 1,\"SHORT_NAME\": 1,\"DEPT_CODE\": 1}");
reader.setSort(new HashMap<String, Sort.Direction>() {{
put("_id", Sort.Direction.DESC);
}});
reader.setQuery("{DATE_TIME: {$gt:"+date+"}}");
reader.setPageSize(250);
return reader;
}
@Bean
public CountryItemProcessor CountryProcessor(){
return new CountryItemProcessor();
}
@Bean
public JpaItemWriter<COUNTRY> country_writer(){
JpaItemWriter<COUNTRY> jpa = new JpaItemWriter<COUNTRY>();
jpa.setEntityManagerFactory(em.getEntityManagerFactory());
return jpa;
}
@JobScope
@Bean
public Step step1(@Value("#{jobExecutionContext['COUNTRY']}") HashMap<String, Object> tab) {
int size = (int) tab.get("size");
//System.out.println(size);
return stepBuilderFactory.get("step1")
.<COUNTRY, COUNTRY>chunk(20)
.reader(CountryItemReader(OVERRIDDEN_BY_EXPRESSION))
.writer(country_writer())
.build();
}
@Bean
public Job TestJob(Step stepConfig) throws Exception {
return this.jobBuilderFactory.get("TestJob")
.incrementer(new RunIdIncrementer())// because a spring config bug, this incrementer is not really useful
.start(stepConfig)
.next(step1(OVERRIDDEN_BY_EXPRESSION))
.build();
}
}
但是添加SimpleAsyncTaskExecutor
时出现错误:
org.springframework.beans.factory.support.ScopeNotActiveException: Error creating bean with name 'scopedTarget.CountryItemReader': Scope 'job' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No context holder available for job scope
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:383) ~[spring-beans-5.3.6.jar:5.3.6]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208) ~[spring-beans-5.3.6.jar:5.3.6]
at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:35) ~[spring-aop-5.3.6.jar:5.3.6]
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:676) ~[spring-aop-5.3.6.jar:5.3.6]
at org.springframework.batch.item.data.MongoItemReader$$EnhancerBySpringCGLIB$443e4.read(<generated>) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:99) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:180) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.core.step.item.SimpleChunkProvider.doInIteration(SimpleChunkProvider.java:126) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:118) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:71) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.3.6.jar:5.3.6]
at org.springframework.batch.core.step.tasklet.TaskletStep.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: java.lang.IllegalStateException: No context holder available for job scope
at org.springframework.batch.core.scope.JobScope.getContext(JobScope.java:159) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.core.scope.JobScope.get(JobScope.java:92) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:371) ~[spring-beans-5.3.6.jar:5.3.6]
我尝试像这样解决这个问题:
https://github.com/spring-projects/spring-batch/issues/1335,但似乎除了主线程外它只使用了一个线程。
有没有办法在不添加调整代码的情况下解决这个问题?
我计划在 Kubernetes 上使用远程分区扩展作业,这个问题是否会因为作业范围而持续存在?
欢迎任何想法或建议。
I'm trying to share data between steps using JobExecutionContext, which works only if i keep the steps single threaded
依靠执行上下文在多线程步骤之间共享数据是不正确的,因为键将被并发线程覆盖。参考文档明确提到在多线程环境下关闭状态管理:
- Javadoc:
remember to use saveState=false if used in a multi-threaded client
- Reference doc:
it is not recommended to use job-scoped beans in multi-threaded or partitioned steps
就是说,我看不出从多线程步骤到下一步可以共享什么密钥(因为线程是并行执行的),但如果你真的需要这样做,你应该使用另一个类似于定义线程安全的共享 bean 的方法。
在我的 Spring 批处理作业中,我尝试使用 JobExecutionContext 在步骤之间共享数据,这只有在我保持步骤单线程时才有效,如下所示:
@EnableTask
@EnableBatchProcessing
@Configuration
@PropertySource(value = {"classpath:application.properties"})
public class Config{
private static final HashMap<String,Object> OVERRIDDEN_BY_EXPRESSION = null;
private static final String QUERY = "SELECT * FROM \"Config\"";
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
EntityManager em;
@Autowired
DataSource dataSource;
/*Config Step*/
@Bean
public JdbcCursorItemReader<BatchConfig> configReader(DataSource dataSource) {
JdbcCursorItemReader<BatchConfig> config = new JdbcCursorItemReader<>();
config.setDataSource(dataSource);
config.setSql(QUERY);
config.setRowMapper(new BatchRowMapper());
return config;
}
@Bean
public ItemWriter<BatchConfig> itemWriter() {
return new ItemWriter<BatchConfig>() {
private StepExecution stepExecution;
@Override
public void write(List<? extends BatchConfig> items) {
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
for (BatchConfig item : items) {
HashMap<String, Object> table = new HashMap<>();
table.put("date", item.getDate_time());
table.put("size", item.getSize());
System.out.println(table);
stepContext.put(item.getName(), table);
}
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
};
}
@Bean
public Step stepConfig(JdbcCursorItemReader<BatchConfig> configReader) throws Exception {
return stepBuilderFactory.get("stepConfig")
.<BatchConfig, BatchConfig>chunk(10)
.reader(configReader)
.writer(itemWriter())
.listener(promotionListener())
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"COUNTRY", "CATEGORY", "USER"});
return listener;
}
/*Country Step*/
@JobScope
@Bean
public MongoItemReader<COUNTRY> CountryItemReader(@Value("#{jobExecutionContext['COUNTRY']}") HashMap<String, Object> table) {
int date = (int) table.get("date");
MongoItemReader<COUNTRY> reader = new MongoItemReader<COUNTRY>();
reader.setTemplate(mongoTemplate);
reader.setTargetType(COUNTRY.class);
reader.setCollection("COUNTRY");
reader.setFields("{\"COUNTRY_NAME\": 1,\"SHORT_NAME\": 1,\"DEPT_CODE\": 1}");
reader.setSort(new HashMap<String, Sort.Direction>() {{
put("_id", Sort.Direction.DESC);
}});
reader.setQuery("{DATE_TIME: {$gt:"+date+"}}");
reader.setPageSize(250);
return reader;
}
@Bean
public CountryItemProcessor CountryProcessor(){
return new CountryItemProcessor();
}
@Bean
public JpaItemWriter<COUNTRY> country_writer(){
JpaItemWriter<COUNTRY> jpa = new JpaItemWriter<COUNTRY>();
jpa.setEntityManagerFactory(em.getEntityManagerFactory());
return jpa;
}
@JobScope
@Bean
public Step step1(@Value("#{jobExecutionContext['COUNTRY']}") HashMap<String, Object> tab) {
int size = (int) tab.get("size");
//System.out.println(size);
return stepBuilderFactory.get("step1")
.<COUNTRY, COUNTRY>chunk(20)
.reader(CountryItemReader(OVERRIDDEN_BY_EXPRESSION))
.writer(country_writer())
.build();
}
@Bean
public Job TestJob(Step stepConfig) throws Exception {
return this.jobBuilderFactory.get("TestJob")
.incrementer(new RunIdIncrementer())// because a spring config bug, this incrementer is not really useful
.start(stepConfig)
.next(step1(OVERRIDDEN_BY_EXPRESSION))
.build();
}
}
但是添加SimpleAsyncTaskExecutor
时出现错误:
org.springframework.beans.factory.support.ScopeNotActiveException: Error creating bean with name 'scopedTarget.CountryItemReader': Scope 'job' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No context holder available for job scope
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:383) ~[spring-beans-5.3.6.jar:5.3.6]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208) ~[spring-beans-5.3.6.jar:5.3.6]
at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:35) ~[spring-aop-5.3.6.jar:5.3.6]
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:676) ~[spring-aop-5.3.6.jar:5.3.6]
at org.springframework.batch.item.data.MongoItemReader$$EnhancerBySpringCGLIB$443e4.read(<generated>) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:99) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:180) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.core.step.item.SimpleChunkProvider.doInIteration(SimpleChunkProvider.java:126) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:118) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:71) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.3.6.jar:5.3.6]
at org.springframework.batch.core.step.tasklet.TaskletStep.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262) ~[spring-batch-infrastructure-4.3.2.jar:4.3.2]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: java.lang.IllegalStateException: No context holder available for job scope
at org.springframework.batch.core.scope.JobScope.getContext(JobScope.java:159) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.batch.core.scope.JobScope.get(JobScope.java:92) ~[spring-batch-core-4.3.2.jar:4.3.2]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:371) ~[spring-beans-5.3.6.jar:5.3.6]
我尝试像这样解决这个问题: https://github.com/spring-projects/spring-batch/issues/1335,但似乎除了主线程外它只使用了一个线程。
有没有办法在不添加调整代码的情况下解决这个问题?
我计划在 Kubernetes 上使用远程分区扩展作业,这个问题是否会因为作业范围而持续存在?
欢迎任何想法或建议。
I'm trying to share data between steps using JobExecutionContext, which works only if i keep the steps single threaded
依靠执行上下文在多线程步骤之间共享数据是不正确的,因为键将被并发线程覆盖。参考文档明确提到在多线程环境下关闭状态管理:
- Javadoc:
remember to use saveState=false if used in a multi-threaded client
- Reference doc:
it is not recommended to use job-scoped beans in multi-threaded or partitioned steps
就是说,我看不出从多线程步骤到下一步可以共享什么密钥(因为线程是并行执行的),但如果你真的需要这样做,你应该使用另一个类似于定义线程安全的共享 bean 的方法。