如何在 Spring 批处理中 运行 循环中的一个步骤:已更新
How to run a step in a loop in Spring Batch : Updated
我有以下 spring 个批处理作业:
<job id="MyBatchJob" job-repository="jobRepository">
<step id="ConfigurationReadStep">
<tasklet ref="ConfigurationReadTasklet" transaction-manager="jobRepository-transactionManager"/>
<next on="COMPLETED" to="NextStep" />
</step>
<step id="NextStep">
<tasklet transaction-manager="jobRepository-transactionManager">
<chunk reader="myItemReader" writer="myItemWriter" commit-interval="1000"/>
</tasklet>
</step>
<listeners>
<listener ref="jobListener" />
</listeners>
</job>
它的第一步是配置读取步骤,在一些业务逻辑之后,我遇到了一个查询列表。例如说 10 个查询。我知道我可以使用 JobExecutionContext
和 PromotionListener
.
来推广此列表
但是,我想将此查询一个接一个地提供给下一步的 reader 作为 reader 查询和 运行 循环中的那一步,直到所有 reader 查询被消耗。我想 运行 每个查询作为 spring 批处理方案,因为它们可能 return 大量项目。
我该怎么做?
************************************** 更新 ********* *************************
这就是我想要做的:
<job id="MyJob" job-repository="jobRepository">
<step id="ConfigurationReadStep">
<tasklet ref="ConfigurationReadTasklet" transaction-manager="jobRepository-transactionManager"/>
<next on="COMPLETED" to="MyNextStep" />
<listeners>
<listener ref="promotionListener"/>
</listeners>
</step>
<step id="MyNextStep" next="limitDecision">
<tasklet transaction-manager="jobRepository-transactionManager">
<chunk reader="MyItemReader" writer="MyItemWriter" commit-interval="1000"/>
</tasklet>
</step>
<decision id="limitDecision" decider="limitDecider">
<next on="CONTINUE" to="MyNextStep" />
<end on="COMPLETED" />
</decision>
<listeners>
<listener ref="jobListener" />
</listeners>
</job>
<beans:bean id="jobListener" class="com.hsbc.gbm.dml.integration.batch.listener.SimpleJobListener" />
<beans:bean id="promotionListener" class="org.springframework.batch.core.listener.ExecutionContextPromotionListener">
<beans:property name="keys" value="readerLimit,readerQueries,readerSQL" />
</beans:bean>
<beans:bean id="ConfigurationReadTasklet" class="com.mypackage.ConfigurationReadTasklet" scope="step">
</beans:bean>
<beans:bean id="MyItemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
<beans:property name="dataSource" ref="myDataSource" />
<beans:property name="sql" value="#{jobExecutionContext['readerSQL']}" />
<beans:property name="rowMapper">
<beans:bean name="mapper" class="com.mypackage.MyRowMapper" />
</beans:property>
</beans:bean>
<beans:bean id="MyItemWriter" class="com.mypackage.MyItemWriter" scope="step">
<beans:constructor-arg ref="myDataSource" />
</beans:bean>
<beans:bean id="limitDecider" class="com.mypackage.StepLoopLimitDecider" scope="step">
<beans:property name="readerQueries" value="#{jobExecutionContext['readerQueries']}" />
</beans:bean>
在 tasklet 中,我将获得 reader 查询列表。我会将 readerQuery
设置为其中的第一个,然后转到下一步,通常 运行 作为 spring 批处理步骤。
完成后,我希望我的 Decider
检查是否有更多 reader 查询,如果有,它将 readerQuery
更改为下一个查询并重新 运行 NextStep
,否则作业将完成。以下是我的决策者:
public class StepLoopLimitDecider implements JobExecutionDecider {
private List<String> readerQueries;
private int count = 1;
public FlowExecutionStatus decide(JobExecution jobExecution,StepExecution stepExecution) {
if (count >= this.readerQueries.size()) {
return new FlowExecutionStatus("COMPLETED");
}
else {
System.out.println(count + ": "+readerQueries.get(count));
jobExecution.getExecutionContext().put("readerSQL", readerQueries.get(count));
count = count + 1;
return new FlowExecutionStatus("CONTINUE");
}
}
public void setReaderQueries(List<String> readerQueries) {
this.readerQueries = readerQueries;
}
}
但是,这不起作用。第一次正确第 运行 步。但是决策程序失败并出现以下错误:
2016-12-06 14:46:24 ERROR AbstractJob:306 - Encountered fatal error executing job
org.springframework.batch.core.JobExecutionException: Flow execution ended unexpectedly
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:141)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:281)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:120)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:114)
at org.springframework.batch.core.launch.support.CommandLineJobRunner.start(CommandLineJobRunner.java:348)
at org.springframework.batch.core.launch.support.CommandLineJobRunner.main(CommandLineJobRunner.java:565)
Caused by:
org.springframework.batch.core.job.flow.FlowExecutionException: Ended flow=MyBatchJob at state=MyBatchJob.limitDecision with exception
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:152)
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:124)
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:135)
... 6 more
Caused by:
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.limitDecider': Scope 'step' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No context holder available for step scope
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:339)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:190)
at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:33)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:182)
at $Proxy4.decide(Unknown Source)
at org.springframework.batch.core.job.flow.support.state.DecisionState.handle(DecisionState.java:43)
at org.springframework.batch.core.configuration.xml.SimpleFlowFactoryBean$DelegateState.handle(SimpleFlowFactoryBean.java:141)
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:144)
... 8 more
Caused by:
java.lang.IllegalStateException: No context holder available for step scope
at org.springframework.batch.core.scope.StepScope.getContext(StepScope.java:197)
at org.springframework.batch.core.scope.StepScope.get(StepScope.java:139)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:325)
... 15 more
有人可以帮我完成这项工作吗?
我建议您根据计算查询的业务逻辑结果动态构建作业。
但是,您不能使用xml-configuration,因此,我建议您使用java-config api.
我在 "how to create a job dynamically" 上写过几个类似问题的答案。看看它们,如果您需要进一步的建议,请告诉我。
我有以下 spring 个批处理作业:
<job id="MyBatchJob" job-repository="jobRepository">
<step id="ConfigurationReadStep">
<tasklet ref="ConfigurationReadTasklet" transaction-manager="jobRepository-transactionManager"/>
<next on="COMPLETED" to="NextStep" />
</step>
<step id="NextStep">
<tasklet transaction-manager="jobRepository-transactionManager">
<chunk reader="myItemReader" writer="myItemWriter" commit-interval="1000"/>
</tasklet>
</step>
<listeners>
<listener ref="jobListener" />
</listeners>
</job>
它的第一步是配置读取步骤,在一些业务逻辑之后,我遇到了一个查询列表。例如说 10 个查询。我知道我可以使用 JobExecutionContext
和 PromotionListener
.
但是,我想将此查询一个接一个地提供给下一步的 reader 作为 reader 查询和 运行 循环中的那一步,直到所有 reader 查询被消耗。我想 运行 每个查询作为 spring 批处理方案,因为它们可能 return 大量项目。
我该怎么做?
************************************** 更新 ********* *************************
这就是我想要做的:
<job id="MyJob" job-repository="jobRepository">
<step id="ConfigurationReadStep">
<tasklet ref="ConfigurationReadTasklet" transaction-manager="jobRepository-transactionManager"/>
<next on="COMPLETED" to="MyNextStep" />
<listeners>
<listener ref="promotionListener"/>
</listeners>
</step>
<step id="MyNextStep" next="limitDecision">
<tasklet transaction-manager="jobRepository-transactionManager">
<chunk reader="MyItemReader" writer="MyItemWriter" commit-interval="1000"/>
</tasklet>
</step>
<decision id="limitDecision" decider="limitDecider">
<next on="CONTINUE" to="MyNextStep" />
<end on="COMPLETED" />
</decision>
<listeners>
<listener ref="jobListener" />
</listeners>
</job>
<beans:bean id="jobListener" class="com.hsbc.gbm.dml.integration.batch.listener.SimpleJobListener" />
<beans:bean id="promotionListener" class="org.springframework.batch.core.listener.ExecutionContextPromotionListener">
<beans:property name="keys" value="readerLimit,readerQueries,readerSQL" />
</beans:bean>
<beans:bean id="ConfigurationReadTasklet" class="com.mypackage.ConfigurationReadTasklet" scope="step">
</beans:bean>
<beans:bean id="MyItemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
<beans:property name="dataSource" ref="myDataSource" />
<beans:property name="sql" value="#{jobExecutionContext['readerSQL']}" />
<beans:property name="rowMapper">
<beans:bean name="mapper" class="com.mypackage.MyRowMapper" />
</beans:property>
</beans:bean>
<beans:bean id="MyItemWriter" class="com.mypackage.MyItemWriter" scope="step">
<beans:constructor-arg ref="myDataSource" />
</beans:bean>
<beans:bean id="limitDecider" class="com.mypackage.StepLoopLimitDecider" scope="step">
<beans:property name="readerQueries" value="#{jobExecutionContext['readerQueries']}" />
</beans:bean>
在 tasklet 中,我将获得 reader 查询列表。我会将 readerQuery
设置为其中的第一个,然后转到下一步,通常 运行 作为 spring 批处理步骤。
完成后,我希望我的 Decider
检查是否有更多 reader 查询,如果有,它将 readerQuery
更改为下一个查询并重新 运行 NextStep
,否则作业将完成。以下是我的决策者:
public class StepLoopLimitDecider implements JobExecutionDecider {
private List<String> readerQueries;
private int count = 1;
public FlowExecutionStatus decide(JobExecution jobExecution,StepExecution stepExecution) {
if (count >= this.readerQueries.size()) {
return new FlowExecutionStatus("COMPLETED");
}
else {
System.out.println(count + ": "+readerQueries.get(count));
jobExecution.getExecutionContext().put("readerSQL", readerQueries.get(count));
count = count + 1;
return new FlowExecutionStatus("CONTINUE");
}
}
public void setReaderQueries(List<String> readerQueries) {
this.readerQueries = readerQueries;
}
}
但是,这不起作用。第一次正确第 运行 步。但是决策程序失败并出现以下错误:
2016-12-06 14:46:24 ERROR AbstractJob:306 - Encountered fatal error executing job
org.springframework.batch.core.JobExecutionException: Flow execution ended unexpectedly
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:141)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:281)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:120)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:114)
at org.springframework.batch.core.launch.support.CommandLineJobRunner.start(CommandLineJobRunner.java:348)
at org.springframework.batch.core.launch.support.CommandLineJobRunner.main(CommandLineJobRunner.java:565)
Caused by:
org.springframework.batch.core.job.flow.FlowExecutionException: Ended flow=MyBatchJob at state=MyBatchJob.limitDecision with exception
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:152)
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:124)
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:135)
... 6 more
Caused by:
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.limitDecider': Scope 'step' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No context holder available for step scope
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:339)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:190)
at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:33)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:182)
at $Proxy4.decide(Unknown Source)
at org.springframework.batch.core.job.flow.support.state.DecisionState.handle(DecisionState.java:43)
at org.springframework.batch.core.configuration.xml.SimpleFlowFactoryBean$DelegateState.handle(SimpleFlowFactoryBean.java:141)
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:144)
... 8 more
Caused by:
java.lang.IllegalStateException: No context holder available for step scope
at org.springframework.batch.core.scope.StepScope.getContext(StepScope.java:197)
at org.springframework.batch.core.scope.StepScope.get(StepScope.java:139)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:325)
... 15 more
有人可以帮我完成这项工作吗?
我建议您根据计算查询的业务逻辑结果动态构建作业。
但是,您不能使用xml-configuration,因此,我建议您使用java-config api.
我在 "how to create a job dynamically" 上写过几个类似问题的答案。看看它们,如果您需要进一步的建议,请告诉我。