MessageChannelPartitionHandler 使用来自 QueueChannel 的第一个可用回复,从而完成错误的工作

MessageChannelPartitionHandler consumes the first available reply from QueueChannel and thus completing wrong job

我遇到了与此处所述相同的问题

http://forum.spring.io/forum/spring-projects/integration/724241-s-integration-s-batch-remote-step-execution-problem?view=thread

我有多个作业 运行,其中的步骤使用下面定义的相同 parent 分区处理程序 'parentPartitionHandler'

<bean id="parentPartitionHandler"class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler>
<property name="gridSize" value="3"/>
<property name="replyChannel" ref="outboundReplies"/>
<property name="messagingOperations">
<bean class="org.springframework.integration.core.MessagingTemplate">
  <property name="defaultChannel" ref="outboundRequests"/>
            <property name="receiveTimeout" value="60000000"/>
        </bean>
    </property>
    <property name="stepName" value="parentPartitionStep"/>
</bean>

我所有的作业都具有如下类似的配置,其中第 2 步 'studentPartitionAndProcessStep' 是分区步骤

<job id="studentLoadJob" xmlns="http://www.springframework.org/schema/batch"
     job-repository="jobRepository" restartable="true" parent="abstractJob">
    <step id="studentLoadStep" parent="parentLoadStep" next="studentPartitionAndProcessStep"/>
    <step id="studentPartitionAndProcessStep" next="studentCleanupStep">
        <partition partitioner="filePartitioner" handler="studentPartitionHandler"/>
    </step>
    <step id="studentCleanupStep" parent="parentCleanupStep"/>
</job>

<bean id="studentPartitionHandler"
      parent="parentPartitionHandler">
    <property name="stepName" value="studentPartitionStep"/>
</bean>

我使用了这里相同的主配置 https://github.com/mminella/Spring-Batch-Talk-2.0/blob/master/src/main/resources/META-INF/remotePartition.xml

<int:channel id="outboundReplies">
 <int:queue/>
 </int:channel>

<int:channel id="inboundStaging">
</int:channel>

<int:aggregator ref="parentPartitionHandler" send-partial-result-on-expiry="true" send-timeout="60000000"
                input-channel="inboundStaging" output-channel="outboundReplies"
                expire-groups-upon-completion="true"/>

我遇到的问题是聚合器似乎正确地收集了一个组的消息,但在 MessageChannelPartitionHandler 内部,下面的语句接收到第一条可用消息而没有从消息中获取 header 信息。

Message<Collection<StepExecution>> message = (Message<Collection<StepExecution>>) messagingGateway.receive(replyChannel);

因此 PartitionHandler 处理 jobExecutionA 而不是 jobExecutionB 的结果,因此它完成了错误的工作。

MessageChannelPartitionHandler 似乎使用了来自 QueueChannel 的回复(在我的配置中为 outboundReplies),而不考虑 correlationId 而是第一个可用消息。它有时工作,然后它不工作,然后当我调试时,我发现我发生了与 post here

相同的事情

http://forum.spring.io/forum/spring-projects/integration/724241-s-integration-s-batch-remote-step-execution-problem?view=thread

我这里有什么地方做错了吗?
如果你需要,我可以提供更多配置。

编辑:
使用网关一切正常。我也在尝试使用 Adapters 并添加 header-enricher,以便将 replyChannel object 添加到 header。我添加了一个 header-enricher 元素,但我可能使用错误,因为聚合消息处理程序抛出 "no outputChannel or replyChannel header available"。 所有分区请求都在 outboundRequests 通道和 rabbit queue 上发送,其中从站将使用来自 inboundRequestschannel 的请求,服务激活器在其中处理它们并发回以在 outboundStaging 通道上回复 queue。在主机端,聚合器从 inboundStaging channel.Can 读取分区响应消息,你指出在使用适配器时我需要使用 header-enricher 的确切位置。

<int:channel id="outboundRequests">
    <int:dispatcher task-executor="taskExecutor" failover="true"/>
</int:channel>

<int:channel id="inboundStaging"/>
<int:channel id="inboundRequests"/>
<int:channel id="outboundStaging"/>
<int:channel id="setHeaderPartionHandlerReplyChannel"/>


<int-amqp:outbound-channel-adapter
        id="filePartitionRequestOutboundGateway"
        channel="outboundRequests"
        amqp-template="rabbitTemplate"
        exchange-name="${rabbitmq.classflow.exchange}"
        routing-key="${rabbitmq.classflow.batch.partition.routingkey}"
        mapped-request-headers="*"
        />


<int-amqp:inbound-channel-adapter
        id="filePartitionRequestInboundGateway"
        concurrent-consumers="${rabbitmq.classflow.batch.partition.consumers}"
        channel="inboundRequests"
        receive-timeout="60000000"
        queue-names="${rabbitmq.classflow.batch.partition.queuename.request}"
        connection-factory="rabbitConnectionFactory"
        mapped-request-headers="*"
        />

<int:header-enricher input-channel="setHeaderPartionHandlerReplyChannel" output-channel="outboundRequests">
    <int:header-channels-to-string/>
</int:header-enricher>

<int:service-activator ref="stepExecutionRequestHandler" input-channel="inboundRequests"
                       output-channel="outboundStaging"/>


<int-amqp:outbound-channel-adapter
        id="filePartitionRepyOutboundGateway"
        channel="outboundStaging"
        amqp-template="rabbitTemplate"
        exchange-name="${rabbitmq.classflow.exchange}"
        routing-key="${rabbitmq.classflow.batch.partition.queuename.reply.routingkey}"
        mapped-request-headers="*"
 />

<int-amqp:inbound-channel-adapter
        id="filePartitionRepyInboundGateway"
        channel="inboundStaging"
        queue-names="${rabbitmq.classflow.batch.partition.queuename.reply}"
        connection-factory="rabbitConnectionFactory"
        concurrent-consumers="${rabbitmq.classflow.batch.tenant.job.consumers}"
        mapped-request-headers="*"
   />

<int:aggregator ref="parentPartitionHandler"
                send-partial-result-on-expiry="true"
                send-timeout="60000000"
                input-channel="inboundStaging"/>

<bean id="parentPartitionHandler"
      class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
    <property name="gridSize" value="3"/>
    <property name="messagingOperations">
        <bean class="org.springframework.integration.core.MessagingTemplate">
            <property name="defaultChannel" ref="setHeaderPartionHandlerReplyChannel"/>
            <property name="receiveTimeout" value="60000000"/>
        </bean>
    </property>
    <property name="stepName" value="parentPartitionStep"/>
</bean>

日志记录:

13:46:09.566 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.i.p.MessageChannelPartitionHandler - Sending request: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (MessageChannelPartitionHandler.java:222) 
13:46:09.566 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'setHeaderPartionHandlerReplyChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (AbstractMessageChannel.java:334) 
13:46:09.567 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'loggerChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (AbstractMessageChannel.java:334) 
13:46:09.567 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.handler.LoggingHandler - org.springframework.integration.handler.LoggingHandler#0 received message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (AbstractMessageHandler.java:72) 
13:46:09.568 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.handler.LoggingHandler - [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (LoggingHandler.java:160) 
13:46:09.568 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'loggerChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (AbstractMessageChannel.java:349) 
13:46:09.568 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.t.MessageTransformingHandler - org.springframework.integration.transformer.MessageTransformingHandler#0 received message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (AbstractMessageHandler.java:72) 
13:46:09.568 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'integrationEvaluationContext' (AbstractBeanFactory.java:247) 
13:46:09.570 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'integrationHeaderChannelRegistry' (AbstractBeanFactory.java:247) 
13:46:09.572 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.c.DefaultHeaderChannelRegistry - Registered org.springframework.integration.channel.QueueChannel@f89f170 as 771064f6-d5ae-4f50-b827-00c225a36c86:1 (DefaultHeaderChannelRegistry.java:167) 
13:46:09.572 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'integrationEvaluationContext' (AbstractBeanFactory.java:247) 
13:46:09.572 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'integrationHeaderChannelRegistry' (AbstractBeanFactory.java:247) 
13:46:09.573 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.t.MessageTransformingHandler - handler 'org.springframework.integration.transformer.MessageTransformingHandler#0' sending reply Message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractReplyProducingMessageHandler.java:238) 
13:46:09.573 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.ExecutorChannel - preSend on channel 'outboundRequests', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractMessageChannel.java:334) 
13:46:09.574 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'loggerChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractMessageChannel.java:334) 
13:46:09.574 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.handler.LoggingHandler - org.springframework.integration.handler.LoggingHandler#0 received message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractMessageHandler.java:72) 
13:46:09.574 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.handler.LoggingHandler - [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (LoggingHandler.java:160) 
13:46:09.574 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'loggerChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractMessageChannel.java:349) 
13:46:09.575 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.ExecutorChannel - postSend (sent=true) on channel 'outboundRequests', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractMessageChannel.java:349) 
13:46:09.575 [taskExecutor-3] DEBUG o.s.i.a.o.AmqpOutboundEndpoint - org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0 received message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractMessageHandler.java:72) 
13:46:09.577 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'setHeaderPartionHandlerReplyChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (AbstractMessageChannel.java:349) 
13:46:09.578 [taskExecutor-3] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[correlationId] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240) 
13:46:09.578 [taskExecutor-3] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[sequenceSize] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240) 
13:46:09.578 [taskExecutor-3] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[sequenceNumber] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240) 
13:46:09.579 [taskExecutor-3] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://localuser@127.0.0.1:5672/activfounlocal,13) (RabbitTemplate.java:1043) 
13:46:09.579 [taskExecutor-3] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Publishing message on exchange [classflow.topic], routingKey = [partition.request.<em>] (RabbitTemplate.java:1071) 
13:46:09.579 [taskExecutor-3] DEBUG o.s.i.a.o.AmqpOutboundEndpoint - handler 'org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0' produced no reply for request Message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractReplyProducingMessageHandler.java:184) 
13:46:09.580 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[amqp_receivedRoutingKey] WILL be mapped, matched pattern=</em> (AbstractHeaderMapper.java:240) 
13:46:09.580 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[amqp_deliveryMode] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240) 
13:46:09.580 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[amqp_receivedExchange] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240) 
13:46:09.580 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[contentType] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240) 
13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[amqp_redelivered] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240) 
13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[amqp_deliveryTag] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240) 
13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[correlationId] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240) 
13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[sequenceSize] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240) 
13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[sequenceNumber] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240) 
13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'inboundRequests', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={amqp_receivedRoutingKey=partition.request.<em>, amqp_deliveryMode=PERSISTENT, correlationId=56:studentPartitionStep, sequenceSize=1, amqp_receivedExchange=classflow.topic, contentType=application/x-java-serialized-object, amqp_redelivered=false, amqp_deliveryTag=1, sequenceNumber=0, id=e51f8ed9-03df-0f90-b9c2-c3542bda76c6, timestamp=1425494769581}] (AbstractMessageChannel.java:334) 
13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'loggerChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={amqp_receivedRoutingKey=partition.request.</em>, amqp_deliveryMode=PERSISTENT, correlationId=56:studentPartitionStep, sequenceSize=1, amqp_receivedExchange=classflow.topic, contentType=application/x-java-serialized-object, amqp_redelivered=false, amqp_deliveryTag=1, sequenceNumber=0, id=e51f8ed9-03df-0f90-b9c2-c3542bda76c6, timestamp=1425494769581}] (AbstractMessageChannel.java:334) 
13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.handler.LoggingHandler - org.springframework.integration.handler.LoggingHandler#0 received message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={amqp_receivedRoutingKey=partition.request.<em>, amqp_deliveryMode=PERSISTENT, correlationId=56:studentPartitionStep, sequenceSize=1, amqp_receivedExchange=classflow.topic, contentType=application/x-java-serialized-object, amqp_redelivered=false, amqp_deliveryTag=1, sequenceNumber=0, id=e51f8ed9-03df-0f90-b9c2-c3542bda76c6, timestamp=1425494769581}] (AbstractMessageHandler.java:72) 
13:46:09.582 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.handler.LoggingHandler - [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={amqp_receivedRoutingKey=partition.request.</em>, amqp_deliveryMode=PERSISTENT, correlationId=56:studentPartitionStep, sequenceSize=1, amqp_receivedExchange=classflow.topic, contentType=application/x-java-serialized-object, amqp_redelivered=false, amqp_deliveryTag=1, sequenceNumber=0, id=e51f8ed9-03df-0f90-b9c2-c3542bda76c6, timestamp=1425494769581}] (LoggingHandler.java:160) 
13:46:09.582 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'loggerChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={amqp_receivedRoutingKey=partition.request.<em>, amqp_deliveryMode=PERSISTENT, correlationId=56:studentPartitionStep, sequenceSize=1, amqp_receivedExchange=classflow.topic, contentType=application/x-java-serialized-object, amqp_redelivered=false, amqp_deliveryTag=1, sequenceNumber=0, id=e51f8ed9-03df-0f90-b9c2-c3542bda76c6, timestamp=1425494769581}] (AbstractMessageChannel.java:349) 
13:46:09.582 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.h.ServiceActivatingHandler - ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@d15c892] received message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={amqp_receivedRoutingKey=partition.request.</em>, amqp_deliveryMode=PERSISTENT, correlationId=56:studentPartitionStep, sequenceSize=1, amqp_receivedExchange=classflow.topic, contentType=application/x-java-serialized-object, amqp_redelivered=false, amqp_deliveryTag=1, sequenceNumber=0, id=e51f8ed9-03df-0f90-b9c2-c3542bda76c6, timestamp=1425494769581}] (AbstractMessageHandler.java:72) 
13:46:09.599 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'studentPartitionStep' (AbstractBeanFactory.java:247) 
13:46:09.658 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.transaction.config.internalTransactionAdvisor' (AbstractBeanFactory.java:247) 
13:46:09.659 [SimpleAsyncTaskExecutor-1] DEBUG o.s.batch.core.step.AbstractStep - Executing: id=269 (AbstractStep.java:183) 

谢谢
马利卡君

这已在 Spring 批次 3.0.3 中修复,只要您不将特定 replyChannel 注入分区处理程序即可。

spring-batch-integration 现在是主 Spring Batch project.

的一部分

编辑

回应您最后的评论(关于使用适配器而不是网关)。

是;网关负责保持 replyChannel header 完好无损。使用适配器时需要一些额外的配置:

  • 添加一个带有 <header-channels-to-string/> 元素的 header enricher(参见 the Header Channel Registry documentation);这会将实时 header 频道 object 转换为频道注册表的密钥。
  • 配置适配器以映射 replyChannel header(例如 mapped-request-headers="*")。

网关不需要它,因为它在收到回复时保存对出站消息的引用。