如何在 batch-int:job-launching-gateway 中 运行 异步批处理作业?
how to run async batch job in batch-int:job-launching-gateway?
首先感谢关注,
我在我的项目中结合了 spring 集成和 spring 批处理,我想在 batch-int:job-launching-gateway
中以异步模式启动作业,我的意思是输入通道中的每条消息启动作业在异步中而不是等待 util 完成作业,我的代码是:
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel"/>
更新:
感谢@Nenad 帮助我,我在我的配置中定义了 JobRepository
和 JobLauncher
如下代码:
@Bean
public MapJobRepositoryFactoryBean jobRepositoryFactoryBean() {
MapJobRepositoryFactoryBean fb = new MapJobRepositoryFactoryBean();
return fb;
}
@Bean
public JobLauncher jobLauncher() throws Exception {
final SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepositoryFactoryBean().getObject());
final SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
jobLauncher.setTaskExecutor(simpleAsyncTaskExecutor);
return jobLauncher;
}
但没有成功,并抛出以下异常:
Exception in thread "SimpleAsyncTaskExecutor-20" java.lang.NullPointerException
at org.springframework.batch.core.repository.dao.MapJobExecutionDao.synchronizeStatus(MapJobExecutionDao.java:158)
at org.springframework.batch.core.repository.support.SimpleJobRepository.update(SimpleJobRepository.java:161)
at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.transaction.interceptor.TransactionInterceptor.proceedWithInvocation(TransactionInterceptor.java:99)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:281)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
at com.sun.proxy.$Proxy18.update(Unknown Source)
at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
at com.sun.proxy.$Proxy18.update(Unknown Source)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:351)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:135)
at java.lang.Thread.run(Thread.java:745)
JobLaunchingMessageHandler
正在使用 JobLauncher
在收到 JobLaunchRequest
后启动作业。您可以在配置中将 JobLauncher
定义为:
@Bean
public JobLauncher jobLauncher() {
final SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
final SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
jobLauncher.setTaskExecutor(simpleAsyncTaskExecutor);
return jobLauncher;
}
并且该作业启动器将被注入并用于启动作业,这将导致异步启动。
将 outboundJobRequestChannel
简单地更改为 ExecutorChannel
可能会更容易。参见 the documentation and here。
首先感谢关注,
我在我的项目中结合了 spring 集成和 spring 批处理,我想在 batch-int:job-launching-gateway
中以异步模式启动作业,我的意思是输入通道中的每条消息启动作业在异步中而不是等待 util 完成作业,我的代码是:
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel"/>
更新:
感谢@Nenad 帮助我,我在我的配置中定义了 JobRepository
和 JobLauncher
如下代码:
@Bean
public MapJobRepositoryFactoryBean jobRepositoryFactoryBean() {
MapJobRepositoryFactoryBean fb = new MapJobRepositoryFactoryBean();
return fb;
}
@Bean
public JobLauncher jobLauncher() throws Exception {
final SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepositoryFactoryBean().getObject());
final SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
jobLauncher.setTaskExecutor(simpleAsyncTaskExecutor);
return jobLauncher;
}
但没有成功,并抛出以下异常:
Exception in thread "SimpleAsyncTaskExecutor-20" java.lang.NullPointerException
at org.springframework.batch.core.repository.dao.MapJobExecutionDao.synchronizeStatus(MapJobExecutionDao.java:158)
at org.springframework.batch.core.repository.support.SimpleJobRepository.update(SimpleJobRepository.java:161)
at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.transaction.interceptor.TransactionInterceptor.proceedWithInvocation(TransactionInterceptor.java:99)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:281)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
at com.sun.proxy.$Proxy18.update(Unknown Source)
at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
at com.sun.proxy.$Proxy18.update(Unknown Source)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:351)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:135)
at java.lang.Thread.run(Thread.java:745)
JobLaunchingMessageHandler
正在使用 JobLauncher
在收到 JobLaunchRequest
后启动作业。您可以在配置中将 JobLauncher
定义为:
@Bean
public JobLauncher jobLauncher() {
final SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
final SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
jobLauncher.setTaskExecutor(simpleAsyncTaskExecutor);
return jobLauncher;
}
并且该作业启动器将被注入并用于启动作业,这将导致异步启动。
将 outboundJobRequestChannel
简单地更改为 ExecutorChannel
可能会更容易。参见 the documentation and here。