无法 运行 Spring 并行批处理步骤

Not able to run Spring Batch Steps in parallel

我正在尝试 运行 多个并行步骤,但是当不同的流程试图更新 JOB_EXECUTION 时,我在后端 SQL 服务器数据库中遇到错误(冲突)统计数据。我正在使用异步任务执行器,但除此之外,我不确定如何使这些流成功地到达 运行。

    <batch:split id="createContextMaps" task-executor="asyncTaskExecutor" next="processBWReport">
        <batch:flow>
            <batch:step id="createTabcPermitsMap">
                <batch:tasklet>
                    <batch:chunk reader="tabcPermitReader" writer="tabcPermitContextWriter" commit-interval="5000" />
                </batch:tasklet>
            </batch:step>
        </batch:flow>
        <batch:flow>
            <batch:step id="createCustPermitsMap">
                <batch:tasklet>
                    <batch:chunk reader="custPermitReader" writer="custPermitContextWriter" commit-interval="5000" />
                </batch:tasklet>
            </batch:step>
        </batch:flow>
    </batch:split>

<bean id="asyncTaskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>

我得到Caused by: java.util.ConcurrentModificationException

    org.springframework.batch.core.JobExecutionException: Flow execution ended unexpectedly
        at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:143) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:320) [spring-batch-core-4.3.5.jar:4.3.5]
        at 
org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:149) [spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-5.3.18.jar:5.3.18]
        at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140) [spring-batch-core-4.3.5.jar:4.3.5]
        at com.xxx.batch.domain.comptroller.Launcher.run(Launcher.java:116) [classes/:?]
        at java.lang.Thread.run(Unknown Source) [?:1.8.0_171]
    Caused by: org.springframework.batch.core.job.flow.FlowExecutionException: Ended flow=createContextMaps.0 at state=createContextMaps.0.createTabcPermitsMap with exception
        at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:178) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.state.SplitState.call(SplitState.java:94) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.state.SplitState.call(SplitState.java:91) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:1.8.0_171]
        ... 1 more
    Caused by: java.lang.IllegalArgumentException: Could not serialize the execution context
        at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.serializeContext(JdbcExecutionContextDao.java:306) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.updateExecutionContext(JdbcExecutionContextDao.java:146) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.repository.support.SimpleJobRepository.updateExecutionContext(SimpleJobRepository.java:223) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_171]
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_171]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_171]
        at java.lang.reflect.Method.invoke(Unknown Source) ~[?:1.8.0_171]
        at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.18.jar:5.3.18]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.18.jar:5.3.18]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.18.jar:5.3.18]
        at org.springframework.transaction.interceptor.TransactionInterceptor.proceedWithInvocation(TransactionInterceptor.java:123) ~[spring-tx-5.3.18.jar:5.3.18]
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) ~[spring-tx-5.3.18.jar:5.3.18]
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-5.3.18.jar:5.3.18]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.18.jar:5.3.18]
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.18.jar:5.3.18]
        at com.sun.proxy.$Proxy21.updateExecutionContext(Unknown Source) ~[?:?]
        at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:163) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:68) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.state.SplitState.call(SplitState.java:94) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.state.SplitState.call(SplitState.java:91) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:1.8.0_171]
        ... 1 more
    Caused by: com.fasterxml.jackson.databind.JsonMappingException: (was java.util.ConcurrentModificationException) (through reference chain: java.util.HashMap["CUST_PERMITS"])
        at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:397) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:356) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:316) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeTypedFields(MapSerializer.java:943) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeFields(MapSerializer.java:696) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithoutTypeInfo(MapSerializer.java:681) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithType(MapSerializer.java:650) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithType(MapSerializer.java:33) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.impl.TypeWrappedSerializer.serialize(TypeWrappedSerializer.java:32) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ObjectMapper._writeValueAndClose(ObjectMapper.java:4409) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:3621) ~[jackson-databind-2.11.4.jar:2.11.4]
        at org.springframework.batch.core.repository.dao.Jackson2ExecutionContextStringSerializer.serialize(Jackson2ExecutionContextStringSerializer.java:141) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.repository.dao.Jackson2ExecutionContextStringSerializer.serialize(Jackson2ExecutionContextStringSerializer.java:104) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.serializeContext(JdbcExecutionContextDao.java:302) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.updateExecutionContext(JdbcExecutionContextDao.java:146) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.repository.support.SimpleJobRepository.updateExecutionContext(SimpleJobRepository.java:223) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_171]
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_171]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_171]
        at java.lang.reflect.Method.invoke(Unknown Source) ~[?:1.8.0_171]
        at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.18.jar:5.3.18]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.18.jar:5.3.18]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.18.jar:5.3.18]
        at org.springframework.transaction.interceptor.TransactionInterceptor.proceedWithInvocation(TransactionInterceptor.java:123) ~[spring-tx-5.3.18.jar:5.3.18]
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) ~[spring-tx-5.3.18.jar:5.3.18]
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-5.3.18.jar:5.3.18]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.18.jar:5.3.18]
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.18.jar:5.3.18]
        at com.sun.proxy.$Proxy21.updateExecutionContext(Unknown Source) ~[?:?]
        at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:163) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:68) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.state.SplitState.call(SplitState.java:94) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.state.SplitState.call(SplitState.java:91) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:1.8.0_171]
        ... 1 more
    Caused by: java.util.ConcurrentModificationException
        at java.util.HashMap$HashIterator.nextNode(Unknown Source) ~[?:1.8.0_171]
        at java.util.HashMap$EntryIterator.next(Unknown Source) ~[?:1.8.0_171]
        at java.util.HashMap$EntryIterator.next(Unknown Source) ~[?:1.8.0_171]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeTypedFields(MapSerializer.java:904) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeFields(MapSerializer.java:696) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithoutTypeInfo(MapSerializer.java:681) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithType(MapSerializer.java:650) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithType(MapSerializer.java:33) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeTypedFields(MapSerializer.java:941) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeFields(MapSerializer.java:696) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithoutTypeInfo(MapSerializer.java:681) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithType(MapSerializer.java:650) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithType(MapSerializer.java:33) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.impl.TypeWrappedSerializer.serialize(TypeWrappedSerializer.java:32) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ObjectMapper._writeValueAndClose(ObjectMapper.java:4409) ~[jackson-databind-2.11.4.jar:2.11.4]
        at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:3621) ~[jackson-databind-2.11.4.jar:2.11.4]
        at org.springframework.batch.core.repository.dao.Jackson2ExecutionContextStringSerializer.serialize(Jackson2ExecutionContextStringSerializer.java:141) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.repository.dao.Jackson2ExecutionContextStringSerializer.serialize(Jackson2ExecutionContextStringSerializer.java:104) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.serializeContext(JdbcExecutionContextDao.java:302) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.updateExecutionContext(JdbcExecutionContextDao.java:146) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.repository.support.SimpleJobRepository.updateExecutionContext(SimpleJobRepository.java:223) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_171]
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_171]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_171]
        at java.lang.reflect.Method.invoke(Unknown Source) ~[?:1.8.0_171]
        at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.18.jar:5.3.18]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.18.jar:5.3.18]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.18.jar:5.3.18]
        at org.springframework.transaction.interceptor.TransactionInterceptor.proceedWithInvocation(TransactionInterceptor.java:123) ~[spring-tx-5.3.18.jar:5.3.18]
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) ~[spring-tx-5.3.18.jar:5.3.18]
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-5.3.18.jar:5.3.18]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.18.jar:5.3.18]
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.18.jar:5.3.18]
        at com.sun.proxy.$Proxy21.updateExecutionContext(Unknown Source) ~[?:?]
        at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:163) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:68) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.state.SplitState.call(SplitState.java:94) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at org.springframework.batch.core.job.flow.support.state.SplitState.call(SplitState.java:91) ~[spring-batch-core-4.3.5.jar:4.3.5]
        at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:1.8.0_171]
        ... 1 more

由于 java.util.ConcurrentModificationException.

,序列化执行上下文 java.lang.IllegalArgumentException: Could not serialize the execution context 时似乎出错

由于您的步骤 运行 在不同的并发线程中并行进行,因此您需要确保存储在执行上下文中的所有数据结构都是 thread-safe 并且可序列化。例如,您应该使用 ConcurrentHashMap 而不是 HashMap。此外,由于执行上下文被序列化并保存在作业存储库中,因此所有值都应为 Serializable.