MessageChannelPartitionHandler 使用来自 QueueChannel 的第一个可用回复,从而完成错误的工作
MessageChannelPartitionHandler consumes the first available reply from QueueChannel and thus completing wrong job
我遇到了与此处所述相同的问题
http://forum.spring.io/forum/spring-projects/integration/724241-s-integration-s-batch-remote-step-execution-problem?view=thread
我有多个作业 运行,其中的步骤使用下面定义的相同 parent 分区处理程序 'parentPartitionHandler'
<bean id="parentPartitionHandler"class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler>
<property name="gridSize" value="3"/>
<property name="replyChannel" ref="outboundReplies"/>
<property name="messagingOperations">
<bean class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="outboundRequests"/>
<property name="receiveTimeout" value="60000000"/>
</bean>
</property>
<property name="stepName" value="parentPartitionStep"/>
</bean>
我所有的作业都具有如下类似的配置,其中第 2 步 'studentPartitionAndProcessStep' 是分区步骤
<job id="studentLoadJob" xmlns="http://www.springframework.org/schema/batch"
job-repository="jobRepository" restartable="true" parent="abstractJob">
<step id="studentLoadStep" parent="parentLoadStep" next="studentPartitionAndProcessStep"/>
<step id="studentPartitionAndProcessStep" next="studentCleanupStep">
<partition partitioner="filePartitioner" handler="studentPartitionHandler"/>
</step>
<step id="studentCleanupStep" parent="parentCleanupStep"/>
</job>
<bean id="studentPartitionHandler"
parent="parentPartitionHandler">
<property name="stepName" value="studentPartitionStep"/>
</bean>
我使用了这里相同的主配置 https://github.com/mminella/Spring-Batch-Talk-2.0/blob/master/src/main/resources/META-INF/remotePartition.xml
<int:channel id="outboundReplies">
<int:queue/>
</int:channel>
<int:channel id="inboundStaging">
</int:channel>
<int:aggregator ref="parentPartitionHandler" send-partial-result-on-expiry="true" send-timeout="60000000"
input-channel="inboundStaging" output-channel="outboundReplies"
expire-groups-upon-completion="true"/>
我遇到的问题是聚合器似乎正确地收集了一个组的消息,但在 MessageChannelPartitionHandler 内部,下面的语句接收到第一条可用消息而没有从消息中获取 header 信息。
Message<Collection<StepExecution>> message = (Message<Collection<StepExecution>>) messagingGateway.receive(replyChannel);
因此 PartitionHandler 处理 jobExecutionA 而不是 jobExecutionB 的结果,因此它完成了错误的工作。
MessageChannelPartitionHandler 似乎使用了来自 QueueChannel 的回复(在我的配置中为 outboundReplies),而不考虑 correlationId 而是第一个可用消息。它有时工作,然后它不工作,然后当我调试时,我发现我发生了与 post here
相同的事情
我这里有什么地方做错了吗?
如果你需要,我可以提供更多配置。
编辑:
使用网关一切正常。我也在尝试使用 Adapters 并添加 header-enricher,以便将 replyChannel object 添加到 header。我添加了一个 header-enricher 元素,但我可能使用错误,因为聚合消息处理程序抛出 "no outputChannel or replyChannel header available"。
所有分区请求都在 outboundRequests 通道和 rabbit queue 上发送,其中从站将使用来自 inboundRequestschannel 的请求,服务激活器在其中处理它们并发回以在 outboundStaging 通道上回复 queue。在主机端,聚合器从 inboundStaging channel.Can 读取分区响应消息,你指出在使用适配器时我需要使用 header-enricher 的确切位置。
<int:channel id="outboundRequests">
<int:dispatcher task-executor="taskExecutor" failover="true"/>
</int:channel>
<int:channel id="inboundStaging"/>
<int:channel id="inboundRequests"/>
<int:channel id="outboundStaging"/>
<int:channel id="setHeaderPartionHandlerReplyChannel"/>
<int-amqp:outbound-channel-adapter
id="filePartitionRequestOutboundGateway"
channel="outboundRequests"
amqp-template="rabbitTemplate"
exchange-name="${rabbitmq.classflow.exchange}"
routing-key="${rabbitmq.classflow.batch.partition.routingkey}"
mapped-request-headers="*"
/>
<int-amqp:inbound-channel-adapter
id="filePartitionRequestInboundGateway"
concurrent-consumers="${rabbitmq.classflow.batch.partition.consumers}"
channel="inboundRequests"
receive-timeout="60000000"
queue-names="${rabbitmq.classflow.batch.partition.queuename.request}"
connection-factory="rabbitConnectionFactory"
mapped-request-headers="*"
/>
<int:header-enricher input-channel="setHeaderPartionHandlerReplyChannel" output-channel="outboundRequests">
<int:header-channels-to-string/>
</int:header-enricher>
<int:service-activator ref="stepExecutionRequestHandler" input-channel="inboundRequests"
output-channel="outboundStaging"/>
<int-amqp:outbound-channel-adapter
id="filePartitionRepyOutboundGateway"
channel="outboundStaging"
amqp-template="rabbitTemplate"
exchange-name="${rabbitmq.classflow.exchange}"
routing-key="${rabbitmq.classflow.batch.partition.queuename.reply.routingkey}"
mapped-request-headers="*"
/>
<int-amqp:inbound-channel-adapter
id="filePartitionRepyInboundGateway"
channel="inboundStaging"
queue-names="${rabbitmq.classflow.batch.partition.queuename.reply}"
connection-factory="rabbitConnectionFactory"
concurrent-consumers="${rabbitmq.classflow.batch.tenant.job.consumers}"
mapped-request-headers="*"
/>
<int:aggregator ref="parentPartitionHandler"
send-partial-result-on-expiry="true"
send-timeout="60000000"
input-channel="inboundStaging"/>
<bean id="parentPartitionHandler"
class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
<property name="gridSize" value="3"/>
<property name="messagingOperations">
<bean class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="setHeaderPartionHandlerReplyChannel"/>
<property name="receiveTimeout" value="60000000"/>
</bean>
</property>
<property name="stepName" value="parentPartitionStep"/>
</bean>
日志记录:
13:46:09.566 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.i.p.MessageChannelPartitionHandler - Sending request: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (MessageChannelPartitionHandler.java:222)
13:46:09.566 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'setHeaderPartionHandlerReplyChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (AbstractMessageChannel.java:334)
13:46:09.567 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'loggerChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (AbstractMessageChannel.java:334)
13:46:09.567 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.handler.LoggingHandler - org.springframework.integration.handler.LoggingHandler#0 received message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (AbstractMessageHandler.java:72)
13:46:09.568 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.handler.LoggingHandler - [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (LoggingHandler.java:160)
13:46:09.568 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'loggerChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (AbstractMessageChannel.java:349)
13:46:09.568 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.t.MessageTransformingHandler - org.springframework.integration.transformer.MessageTransformingHandler#0 received message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (AbstractMessageHandler.java:72)
13:46:09.568 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'integrationEvaluationContext' (AbstractBeanFactory.java:247)
13:46:09.570 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'integrationHeaderChannelRegistry' (AbstractBeanFactory.java:247)
13:46:09.572 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.c.DefaultHeaderChannelRegistry - Registered org.springframework.integration.channel.QueueChannel@f89f170 as 771064f6-d5ae-4f50-b827-00c225a36c86:1 (DefaultHeaderChannelRegistry.java:167)
13:46:09.572 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'integrationEvaluationContext' (AbstractBeanFactory.java:247)
13:46:09.572 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'integrationHeaderChannelRegistry' (AbstractBeanFactory.java:247)
13:46:09.573 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.t.MessageTransformingHandler - handler 'org.springframework.integration.transformer.MessageTransformingHandler#0' sending reply Message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractReplyProducingMessageHandler.java:238)
13:46:09.573 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.ExecutorChannel - preSend on channel 'outboundRequests', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractMessageChannel.java:334)
13:46:09.574 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'loggerChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractMessageChannel.java:334)
13:46:09.574 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.handler.LoggingHandler - org.springframework.integration.handler.LoggingHandler#0 received message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractMessageHandler.java:72)
13:46:09.574 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.handler.LoggingHandler - [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (LoggingHandler.java:160)
13:46:09.574 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'loggerChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractMessageChannel.java:349)
13:46:09.575 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.ExecutorChannel - postSend (sent=true) on channel 'outboundRequests', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractMessageChannel.java:349)
13:46:09.575 [taskExecutor-3] DEBUG o.s.i.a.o.AmqpOutboundEndpoint - org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0 received message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractMessageHandler.java:72)
13:46:09.577 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'setHeaderPartionHandlerReplyChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (AbstractMessageChannel.java:349)
13:46:09.578 [taskExecutor-3] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[correlationId] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240)
13:46:09.578 [taskExecutor-3] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[sequenceSize] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240)
13:46:09.578 [taskExecutor-3] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[sequenceNumber] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240)
13:46:09.579 [taskExecutor-3] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://localuser@127.0.0.1:5672/activfounlocal,13) (RabbitTemplate.java:1043)
13:46:09.579 [taskExecutor-3] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Publishing message on exchange [classflow.topic], routingKey = [partition.request.<em>] (RabbitTemplate.java:1071)
13:46:09.579 [taskExecutor-3] DEBUG o.s.i.a.o.AmqpOutboundEndpoint - handler 'org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0' produced no reply for request Message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractReplyProducingMessageHandler.java:184)
13:46:09.580 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[amqp_receivedRoutingKey] WILL be mapped, matched pattern=</em> (AbstractHeaderMapper.java:240)
13:46:09.580 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[amqp_deliveryMode] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240)
13:46:09.580 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[amqp_receivedExchange] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240)
13:46:09.580 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[contentType] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240)
13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[amqp_redelivered] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240)
13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[amqp_deliveryTag] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240)
13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[correlationId] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240)
13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[sequenceSize] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240)
13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[sequenceNumber] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240)
13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'inboundRequests', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={amqp_receivedRoutingKey=partition.request.<em>, amqp_deliveryMode=PERSISTENT, correlationId=56:studentPartitionStep, sequenceSize=1, amqp_receivedExchange=classflow.topic, contentType=application/x-java-serialized-object, amqp_redelivered=false, amqp_deliveryTag=1, sequenceNumber=0, id=e51f8ed9-03df-0f90-b9c2-c3542bda76c6, timestamp=1425494769581}] (AbstractMessageChannel.java:334)
13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'loggerChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={amqp_receivedRoutingKey=partition.request.</em>, amqp_deliveryMode=PERSISTENT, correlationId=56:studentPartitionStep, sequenceSize=1, amqp_receivedExchange=classflow.topic, contentType=application/x-java-serialized-object, amqp_redelivered=false, amqp_deliveryTag=1, sequenceNumber=0, id=e51f8ed9-03df-0f90-b9c2-c3542bda76c6, timestamp=1425494769581}] (AbstractMessageChannel.java:334)
13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.handler.LoggingHandler - org.springframework.integration.handler.LoggingHandler#0 received message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={amqp_receivedRoutingKey=partition.request.<em>, amqp_deliveryMode=PERSISTENT, correlationId=56:studentPartitionStep, sequenceSize=1, amqp_receivedExchange=classflow.topic, contentType=application/x-java-serialized-object, amqp_redelivered=false, amqp_deliveryTag=1, sequenceNumber=0, id=e51f8ed9-03df-0f90-b9c2-c3542bda76c6, timestamp=1425494769581}] (AbstractMessageHandler.java:72)
13:46:09.582 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.handler.LoggingHandler - [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={amqp_receivedRoutingKey=partition.request.</em>, amqp_deliveryMode=PERSISTENT, correlationId=56:studentPartitionStep, sequenceSize=1, amqp_receivedExchange=classflow.topic, contentType=application/x-java-serialized-object, amqp_redelivered=false, amqp_deliveryTag=1, sequenceNumber=0, id=e51f8ed9-03df-0f90-b9c2-c3542bda76c6, timestamp=1425494769581}] (LoggingHandler.java:160)
13:46:09.582 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'loggerChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={amqp_receivedRoutingKey=partition.request.<em>, amqp_deliveryMode=PERSISTENT, correlationId=56:studentPartitionStep, sequenceSize=1, amqp_receivedExchange=classflow.topic, contentType=application/x-java-serialized-object, amqp_redelivered=false, amqp_deliveryTag=1, sequenceNumber=0, id=e51f8ed9-03df-0f90-b9c2-c3542bda76c6, timestamp=1425494769581}] (AbstractMessageChannel.java:349)
13:46:09.582 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.h.ServiceActivatingHandler - ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@d15c892] received message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={amqp_receivedRoutingKey=partition.request.</em>, amqp_deliveryMode=PERSISTENT, correlationId=56:studentPartitionStep, sequenceSize=1, amqp_receivedExchange=classflow.topic, contentType=application/x-java-serialized-object, amqp_redelivered=false, amqp_deliveryTag=1, sequenceNumber=0, id=e51f8ed9-03df-0f90-b9c2-c3542bda76c6, timestamp=1425494769581}] (AbstractMessageHandler.java:72)
13:46:09.599 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'studentPartitionStep' (AbstractBeanFactory.java:247)
13:46:09.658 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.transaction.config.internalTransactionAdvisor' (AbstractBeanFactory.java:247)
13:46:09.659 [SimpleAsyncTaskExecutor-1] DEBUG o.s.batch.core.step.AbstractStep - Executing: id=269 (AbstractStep.java:183)
谢谢
马利卡君
这已在 Spring 批次 3.0.3 中修复,只要您不将特定 replyChannel
注入分区处理程序即可。
spring-batch-integration
现在是主 Spring Batch project.
的一部分
编辑
回应您最后的评论(关于使用适配器而不是网关)。
是;网关负责保持 replyChannel
header 完好无损。使用适配器时需要一些额外的配置:
- 添加一个带有
<header-channels-to-string/>
元素的 header enricher(参见 the Header Channel Registry documentation);这会将实时 header 频道 object 转换为频道注册表的密钥。
- 配置适配器以映射
replyChannel
header(例如 mapped-request-headers="*"
)。
网关不需要它,因为它在收到回复时保存对出站消息的引用。
我遇到了与此处所述相同的问题
http://forum.spring.io/forum/spring-projects/integration/724241-s-integration-s-batch-remote-step-execution-problem?view=thread
我有多个作业 运行,其中的步骤使用下面定义的相同 parent 分区处理程序 'parentPartitionHandler'
<bean id="parentPartitionHandler"class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler>
<property name="gridSize" value="3"/>
<property name="replyChannel" ref="outboundReplies"/>
<property name="messagingOperations">
<bean class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="outboundRequests"/>
<property name="receiveTimeout" value="60000000"/>
</bean>
</property>
<property name="stepName" value="parentPartitionStep"/>
</bean>
我所有的作业都具有如下类似的配置,其中第 2 步 'studentPartitionAndProcessStep' 是分区步骤
<job id="studentLoadJob" xmlns="http://www.springframework.org/schema/batch"
job-repository="jobRepository" restartable="true" parent="abstractJob">
<step id="studentLoadStep" parent="parentLoadStep" next="studentPartitionAndProcessStep"/>
<step id="studentPartitionAndProcessStep" next="studentCleanupStep">
<partition partitioner="filePartitioner" handler="studentPartitionHandler"/>
</step>
<step id="studentCleanupStep" parent="parentCleanupStep"/>
</job>
<bean id="studentPartitionHandler"
parent="parentPartitionHandler">
<property name="stepName" value="studentPartitionStep"/>
</bean>
我使用了这里相同的主配置 https://github.com/mminella/Spring-Batch-Talk-2.0/blob/master/src/main/resources/META-INF/remotePartition.xml
<int:channel id="outboundReplies">
<int:queue/>
</int:channel>
<int:channel id="inboundStaging">
</int:channel>
<int:aggregator ref="parentPartitionHandler" send-partial-result-on-expiry="true" send-timeout="60000000"
input-channel="inboundStaging" output-channel="outboundReplies"
expire-groups-upon-completion="true"/>
我遇到的问题是聚合器似乎正确地收集了一个组的消息,但在 MessageChannelPartitionHandler 内部,下面的语句接收到第一条可用消息而没有从消息中获取 header 信息。
Message<Collection<StepExecution>> message = (Message<Collection<StepExecution>>) messagingGateway.receive(replyChannel);
因此 PartitionHandler 处理 jobExecutionA 而不是 jobExecutionB 的结果,因此它完成了错误的工作。
MessageChannelPartitionHandler 似乎使用了来自 QueueChannel 的回复(在我的配置中为 outboundReplies),而不考虑 correlationId 而是第一个可用消息。它有时工作,然后它不工作,然后当我调试时,我发现我发生了与 post here
相同的事情我这里有什么地方做错了吗?
如果你需要,我可以提供更多配置。
编辑:
使用网关一切正常。我也在尝试使用 Adapters 并添加 header-enricher,以便将 replyChannel object 添加到 header。我添加了一个 header-enricher 元素,但我可能使用错误,因为聚合消息处理程序抛出 "no outputChannel or replyChannel header available"。
所有分区请求都在 outboundRequests 通道和 rabbit queue 上发送,其中从站将使用来自 inboundRequestschannel 的请求,服务激活器在其中处理它们并发回以在 outboundStaging 通道上回复 queue。在主机端,聚合器从 inboundStaging channel.Can 读取分区响应消息,你指出在使用适配器时我需要使用 header-enricher 的确切位置。
<int:channel id="outboundRequests">
<int:dispatcher task-executor="taskExecutor" failover="true"/>
</int:channel>
<int:channel id="inboundStaging"/>
<int:channel id="inboundRequests"/>
<int:channel id="outboundStaging"/>
<int:channel id="setHeaderPartionHandlerReplyChannel"/>
<int-amqp:outbound-channel-adapter
id="filePartitionRequestOutboundGateway"
channel="outboundRequests"
amqp-template="rabbitTemplate"
exchange-name="${rabbitmq.classflow.exchange}"
routing-key="${rabbitmq.classflow.batch.partition.routingkey}"
mapped-request-headers="*"
/>
<int-amqp:inbound-channel-adapter
id="filePartitionRequestInboundGateway"
concurrent-consumers="${rabbitmq.classflow.batch.partition.consumers}"
channel="inboundRequests"
receive-timeout="60000000"
queue-names="${rabbitmq.classflow.batch.partition.queuename.request}"
connection-factory="rabbitConnectionFactory"
mapped-request-headers="*"
/>
<int:header-enricher input-channel="setHeaderPartionHandlerReplyChannel" output-channel="outboundRequests">
<int:header-channels-to-string/>
</int:header-enricher>
<int:service-activator ref="stepExecutionRequestHandler" input-channel="inboundRequests"
output-channel="outboundStaging"/>
<int-amqp:outbound-channel-adapter
id="filePartitionRepyOutboundGateway"
channel="outboundStaging"
amqp-template="rabbitTemplate"
exchange-name="${rabbitmq.classflow.exchange}"
routing-key="${rabbitmq.classflow.batch.partition.queuename.reply.routingkey}"
mapped-request-headers="*"
/>
<int-amqp:inbound-channel-adapter
id="filePartitionRepyInboundGateway"
channel="inboundStaging"
queue-names="${rabbitmq.classflow.batch.partition.queuename.reply}"
connection-factory="rabbitConnectionFactory"
concurrent-consumers="${rabbitmq.classflow.batch.tenant.job.consumers}"
mapped-request-headers="*"
/>
<int:aggregator ref="parentPartitionHandler"
send-partial-result-on-expiry="true"
send-timeout="60000000"
input-channel="inboundStaging"/>
<bean id="parentPartitionHandler"
class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
<property name="gridSize" value="3"/>
<property name="messagingOperations">
<bean class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="setHeaderPartionHandlerReplyChannel"/>
<property name="receiveTimeout" value="60000000"/>
</bean>
</property>
<property name="stepName" value="parentPartitionStep"/>
</bean>
日志记录:
13:46:09.566 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.i.p.MessageChannelPartitionHandler - Sending request: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (MessageChannelPartitionHandler.java:222)
13:46:09.566 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'setHeaderPartionHandlerReplyChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (AbstractMessageChannel.java:334)
13:46:09.567 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'loggerChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (AbstractMessageChannel.java:334)
13:46:09.567 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.handler.LoggingHandler - org.springframework.integration.handler.LoggingHandler#0 received message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (AbstractMessageHandler.java:72)
13:46:09.568 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.handler.LoggingHandler - [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (LoggingHandler.java:160)
13:46:09.568 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'loggerChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (AbstractMessageChannel.java:349)
13:46:09.568 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.t.MessageTransformingHandler - org.springframework.integration.transformer.MessageTransformingHandler#0 received message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (AbstractMessageHandler.java:72)
13:46:09.568 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'integrationEvaluationContext' (AbstractBeanFactory.java:247)
13:46:09.570 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'integrationHeaderChannelRegistry' (AbstractBeanFactory.java:247)
13:46:09.572 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.c.DefaultHeaderChannelRegistry - Registered org.springframework.integration.channel.QueueChannel@f89f170 as 771064f6-d5ae-4f50-b827-00c225a36c86:1 (DefaultHeaderChannelRegistry.java:167)
13:46:09.572 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'integrationEvaluationContext' (AbstractBeanFactory.java:247)
13:46:09.572 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'integrationHeaderChannelRegistry' (AbstractBeanFactory.java:247)
13:46:09.573 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.t.MessageTransformingHandler - handler 'org.springframework.integration.transformer.MessageTransformingHandler#0' sending reply Message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractReplyProducingMessageHandler.java:238)
13:46:09.573 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.ExecutorChannel - preSend on channel 'outboundRequests', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractMessageChannel.java:334)
13:46:09.574 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'loggerChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractMessageChannel.java:334)
13:46:09.574 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.handler.LoggingHandler - org.springframework.integration.handler.LoggingHandler#0 received message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractMessageHandler.java:72)
13:46:09.574 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.handler.LoggingHandler - [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (LoggingHandler.java:160)
13:46:09.574 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'loggerChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractMessageChannel.java:349)
13:46:09.575 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.ExecutorChannel - postSend (sent=true) on channel 'outboundRequests', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractMessageChannel.java:349)
13:46:09.575 [taskExecutor-3] DEBUG o.s.i.a.o.AmqpOutboundEndpoint - org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0 received message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractMessageHandler.java:72)
13:46:09.577 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'setHeaderPartionHandlerReplyChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (AbstractMessageChannel.java:349)
13:46:09.578 [taskExecutor-3] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[correlationId] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240)
13:46:09.578 [taskExecutor-3] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[sequenceSize] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240)
13:46:09.578 [taskExecutor-3] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[sequenceNumber] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240)
13:46:09.579 [taskExecutor-3] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://localuser@127.0.0.1:5672/activfounlocal,13) (RabbitTemplate.java:1043)
13:46:09.579 [taskExecutor-3] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Publishing message on exchange [classflow.topic], routingKey = [partition.request.<em>] (RabbitTemplate.java:1071)
13:46:09.579 [taskExecutor-3] DEBUG o.s.i.a.o.AmqpOutboundEndpoint - handler 'org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0' produced no reply for request Message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractReplyProducingMessageHandler.java:184)
13:46:09.580 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[amqp_receivedRoutingKey] WILL be mapped, matched pattern=</em> (AbstractHeaderMapper.java:240)
13:46:09.580 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[amqp_deliveryMode] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240)
13:46:09.580 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[amqp_receivedExchange] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240)
13:46:09.580 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[contentType] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240)
13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[amqp_redelivered] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240)
13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[amqp_deliveryTag] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240)
13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[correlationId] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240)
13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[sequenceSize] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240)
13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[sequenceNumber] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240)
13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'inboundRequests', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={amqp_receivedRoutingKey=partition.request.<em>, amqp_deliveryMode=PERSISTENT, correlationId=56:studentPartitionStep, sequenceSize=1, amqp_receivedExchange=classflow.topic, contentType=application/x-java-serialized-object, amqp_redelivered=false, amqp_deliveryTag=1, sequenceNumber=0, id=e51f8ed9-03df-0f90-b9c2-c3542bda76c6, timestamp=1425494769581}] (AbstractMessageChannel.java:334)
13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'loggerChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={amqp_receivedRoutingKey=partition.request.</em>, amqp_deliveryMode=PERSISTENT, correlationId=56:studentPartitionStep, sequenceSize=1, amqp_receivedExchange=classflow.topic, contentType=application/x-java-serialized-object, amqp_redelivered=false, amqp_deliveryTag=1, sequenceNumber=0, id=e51f8ed9-03df-0f90-b9c2-c3542bda76c6, timestamp=1425494769581}] (AbstractMessageChannel.java:334)
13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.handler.LoggingHandler - org.springframework.integration.handler.LoggingHandler#0 received message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={amqp_receivedRoutingKey=partition.request.<em>, amqp_deliveryMode=PERSISTENT, correlationId=56:studentPartitionStep, sequenceSize=1, amqp_receivedExchange=classflow.topic, contentType=application/x-java-serialized-object, amqp_redelivered=false, amqp_deliveryTag=1, sequenceNumber=0, id=e51f8ed9-03df-0f90-b9c2-c3542bda76c6, timestamp=1425494769581}] (AbstractMessageHandler.java:72)
13:46:09.582 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.handler.LoggingHandler - [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={amqp_receivedRoutingKey=partition.request.</em>, amqp_deliveryMode=PERSISTENT, correlationId=56:studentPartitionStep, sequenceSize=1, amqp_receivedExchange=classflow.topic, contentType=application/x-java-serialized-object, amqp_redelivered=false, amqp_deliveryTag=1, sequenceNumber=0, id=e51f8ed9-03df-0f90-b9c2-c3542bda76c6, timestamp=1425494769581}] (LoggingHandler.java:160)
13:46:09.582 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'loggerChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={amqp_receivedRoutingKey=partition.request.<em>, amqp_deliveryMode=PERSISTENT, correlationId=56:studentPartitionStep, sequenceSize=1, amqp_receivedExchange=classflow.topic, contentType=application/x-java-serialized-object, amqp_redelivered=false, amqp_deliveryTag=1, sequenceNumber=0, id=e51f8ed9-03df-0f90-b9c2-c3542bda76c6, timestamp=1425494769581}] (AbstractMessageChannel.java:349)
13:46:09.582 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.h.ServiceActivatingHandler - ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@d15c892] received message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={amqp_receivedRoutingKey=partition.request.</em>, amqp_deliveryMode=PERSISTENT, correlationId=56:studentPartitionStep, sequenceSize=1, amqp_receivedExchange=classflow.topic, contentType=application/x-java-serialized-object, amqp_redelivered=false, amqp_deliveryTag=1, sequenceNumber=0, id=e51f8ed9-03df-0f90-b9c2-c3542bda76c6, timestamp=1425494769581}] (AbstractMessageHandler.java:72)
13:46:09.599 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'studentPartitionStep' (AbstractBeanFactory.java:247)
13:46:09.658 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.transaction.config.internalTransactionAdvisor' (AbstractBeanFactory.java:247)
13:46:09.659 [SimpleAsyncTaskExecutor-1] DEBUG o.s.batch.core.step.AbstractStep - Executing: id=269 (AbstractStep.java:183)
谢谢
马利卡君
这已在 Spring 批次 3.0.3 中修复,只要您不将特定 replyChannel
注入分区处理程序即可。
spring-batch-integration
现在是主 Spring Batch project.
编辑
回应您最后的评论(关于使用适配器而不是网关)。
是;网关负责保持 replyChannel
header 完好无损。使用适配器时需要一些额外的配置:
- 添加一个带有
<header-channels-to-string/>
元素的 header enricher(参见 the Header Channel Registry documentation);这会将实时 header 频道 object 转换为频道注册表的密钥。 - 配置适配器以映射
replyChannel
header(例如mapped-request-headers="*"
)。
网关不需要它,因为它在收到回复时保存对出站消息的引用。