Spring 使用 AMQP (Rabbit) 进行批量集成远程分块
Spring batch integrarion remote chunking using AMQP (Rabbit)
我正在尝试按照此处的说明实施 Remote Chunking
http://docs.spring.io/spring-batch/trunk/reference/html/springBatchIntegration.html#externalizing-batch-process-execution。
我的简单大师工作:
<job id="job" restartable="true">
<step id="step1">
<tasklet>
<chunk reader="imageDataReader" writer="itemWriter" commit-interval="1"/>
</tasklet>
</step>
</job>
主配置:
<int:channel id="replies">
<int:queue/>
</int:channel>
<beans:bean class="org.springframework.batch.core.scope.StepScope" />
<amqp:outbound-channel-adapter id="masterOutboundAdapter"
exchange-name="master.slave.exchange"
routing-key="request"
amqp-template="rabbitTemplate"/>
<amqp:inbound-channel-adapter id="masterInboundAdapter"
connection-factory="connectionFactory"
channel="replies"
queue-names="replies"
/>
<beans:bean id="messagingTemplate"
class="org.springframework.integration.core.MessagingTemplate">
<beans:property name="defaultChannel" ref="masterOutboundAdapter"/>
<beans:property name="receiveTimeout" value="2000"/>
</beans:bean>
<beans:bean id="itemWriter" scope="step"
class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter">
<beans:property name="messagingOperations" ref="messagingTemplate"/>
<beans:property name="replyChannel" ref="replies"/>
</beans:bean>
<beans:bean id="chunkHandler"
class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean">
<beans:property name="chunkWriter" ref="itemWriter"/>
<beans:property name="step" ref="step1"/>
</beans:bean>
从机配置:
<int:channel id="replies"/>
<int:channel id="requests">
</int:channel>
<amqp:outbound-channel-adapter id="slaveOutboundAdapter"
exchange-name="master.slave.exchange"
routing-key="reply"
channel="replies"
amqp-template="rabbitTemplate"
/>
<amqp:inbound-channel-adapter id="slaveInboundAdapter"
connection-factory="connectionFactory"
channel="requests"
queue-names="requests"
/>
<int:service-activator id="serviceActivator"
input-channel="requests"
output-channel="replies"
ref="chunkProcessorChunkHandler"
requires-reply="true"
method="handleChunk"
/>
<bean id="chunkProcessorChunkHandler"
class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler">
<property name="chunkProcessor">
<bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
<property name="itemWriter">
<bean class="xx.yy.ImageDataWriter"/>
</property>
<property name="itemProcessor">
<bean class="xx.yy.ImageDataProcessor"/>
</property>
</bean>
</property>
</bean>
Slave chunk reveice massages,但没有回复并且作业停留在 STARTED 状态
感谢提前。
更新 1
我已添加,mapped-request-headers="*"
到通道适配器和从块回复主块,但主块仍未将作业状态更新为已完成
Slave writer activated, writing:Hello, slave
2015-07-22 19:46:22 DEBUG ChunkProcessorChunkHandler:88 - Completed chunk handling with [StepContribution: read=0, written=1, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING]
2015-07-22 19:46:22 DEBUG DirectChannel:278 - preSend on channel 'replies', message: GenericMessage [payload=ChunkResponse: jobId=1, sequence=491, stepContribution=[StepContribution: read=0, written=1, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], successful=true, headers={amqp_receivedRoutingKey=request, amqp_receivedExchange=master.slave.exchange, amqp_deliveryTag=492, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=requests, amqp_redelivered=false, id=c2ff7d5b-a8dc-88ba-43fa-d6e9cd068ba5, amqp_consumerTag=amq.ctag-YDnMhHBntWQttsQ-0ILlNQ, contentType=application/x-java-serialized-object, timestamp=1437583582347}]
2015-07-22 19:46:22 DEBUG AmqpOutboundEndpoint:72 - org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0 received message: GenericMessage [payload=ChunkResponse: jobId=1, sequence=491, stepContribution=[StepContribution: read=0, written=1, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], successful=true, headers={amqp_receivedRoutingKey=request, amqp_receivedExchange=master.slave.exchange, amqp_deliveryTag=492, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=requests, amqp_redelivered=false, id=c2ff7d5b-a8dc-88ba-43fa-d6e9cd068ba5, amqp_consumerTag=amq.ctag-YDnMhHBntWQttsQ-0ILlNQ, contentType=application/x-java-serialized-object, timestamp=1437583582347}]
2015-07-22 19:46:22 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[amqp_receivedRoutingKey] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
2015-07-22 19:46:22 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[amqp_receivedExchange] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
2015-07-22 19:46:22 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[amqp_deliveryTag] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
2015-07-22 19:46:22 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[amqp_deliveryMode] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
2015-07-22 19:46:22 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[amqp_redelivered] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
2015-07-22 19:46:22 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[contentType] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
2015-07-22 19:46:22 DEBUG RabbitTemplate:1043 - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2)
2015-07-22 19:46:22 DEBUG RabbitTemplate:1071 - Publishing message on exchange [master.slave.exchange], routingKey = [reply]
2015-07-22 19:46:22 DEBUG AmqpOutboundEndpoint:112 - handler 'org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0' produced no reply for request Message: GenericMessage [payload=ChunkResponse: jobId=1, sequence=491, stepContribution=[StepContribution: read=0, written=1, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], successful=true, headers={amqp_receivedRoutingKey=request, amqp_receivedExchange=master.slave.exchange, amqp_deliveryTag=492, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=requests, amqp_redelivered=false, id=c2ff7d5b-a8dc-88ba-43fa-d6e9cd068ba5, amqp_consumerTag=amq.ctag-YDnMhHBntWQttsQ-0ILlNQ, contentType=application/x-java-serialized-object, timestamp=1437583582347}]
精通堆栈跟踪
19:50:21,826 DEBUG ThreadPoolAsynchronousRunner:730 - com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector@32e1664a -- Running DeadlockDetector[Exiting. No pending tasks.]
19:50:21,826 DEBUG BlockingQueueConsumer:365 - Retrieving delivery for Consumer: tags=[{amq.ctag-fG60L6zfXE_GHWqNjPQskw=replies}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0
19:50:21,826 DEBUG QueueChannel:76 - postReceive on channel 'replies', message: GenericMessage [payload=ChunkResponse: jobId=2, sequence=0, stepContribution=[StepContribution: read=0, written=1, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], successful=true, headers={amqp_receivedRoutingKey=reply, amqp_receivedExchange=master.slave.exchange, amqp_deliveryTag=1, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=replies, amqp_redelivered=false, id=ede955c7-6708-42e9-a7f2-b002ed8c0fc8, amqp_consumerTag=amq.ctag-fG60L6zfXE_GHWqNjPQskw, contentType=application/x-java-serialized-object, timestamp=1437583781577}]
19:50:21,826 DEBUG ChunkMessageChannelItemWriter:224 - Found result: ChunkResponse: jobId=2, sequence=0, stepContribution=[StepContribution: read=0, written=1, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], successful=true
19:50:21,826 DEBUG ChunkMessageChannelItemWriter:105 - Dispatching chunk: ChunkRequest: jobId=2, sequence=7, contribution=[StepContribution: read=0, written=0, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], item count=1
19:50:21,826 DEBUG DirectChannel:278 - preSend on channel 'masterOutboundAdapter', message: GenericMessage [payload=ChunkRequest: jobId=2, sequence=7, contribution=[StepContribution: read=0, written=0, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], item count=1, headers={id=44e5f7bc-a00f-fb81-a68d-84a44e27c0ac, timestamp=1437583821826}]
19:50:21,826 DEBUG AmqpOutboundEndpoint:72 - org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0 received message: GenericMessage [payload=ChunkRequest: jobId=2, sequence=7, contribution=[StepContribution: read=0, written=0, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], item count=1, headers={id=44e5f7bc-a00f-fb81-a68d-84a44e27c0ac, timestamp=1437583821826}]
19:50:21,826 DEBUG RabbitTemplate:1043 - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2)
19:50:21,826 DEBUG RabbitTemplate:1071 - Publishing message on exchange [master.slave.exchange], routingKey = [request]
19:50:21,826 DEBUG AmqpOutboundEndpoint:112 - handler 'org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0' produced no reply for request Message: GenericMessage [payload=ChunkRequest: jobId=2, sequence=7, contribution=[StepContribution: read=0, written=0, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], item count=1, headers={id=44e5f7bc-a00f-fb81-a68d-84a44e27c0ac, timestamp=1437583821826}]
19:50:21,826 DEBUG DirectChannel:290 - postSend (sent=true) on channel 'masterOutboundAdapter', message: GenericMessage [payload=ChunkRequest: jobId=2, sequence=7, contribution=[StepContribution: read=0, written=0, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], item count=1, headers={id=44e5f7bc-a00f-fb81-a68d-84a44e27c0ac, timestamp=1437583821826}]
19:50:21,826 DEBUG ChunkOrientedTasklet:88 - Inputs not busy, ended: false
19:50:21,826 DEBUG TaskletStep:437 - Applying contribution: [StepContribution: read=1, written=1, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING]
19:50:21,826 DEBUG DataSourceTransactionManager:472 - Participating in existing transaction
19:50:21,841 DEBUG JdbcTemplate:908 - Executing prepared SQL update
19:50:21,841 DEBUG JdbcTemplate:627 - Executing prepared SQL statement [UPDATE BATCH_STEP_EXECUTION_CONTEXT SET SHORT_CONTEXT = ?, SERIALIZED_CONTEXT = ? WHERE STEP_EXECUTION_ID = ?]
19:50:21,841 DEBUG JdbcTemplate:918 - SQL update affected 1 rows
19:50:21,841 DEBUG TaskletStep:451 - Saving step execution before commit: StepExecution: id=2, version=8, name=step1, status=STARTED, exitStatus=EXECUTING, readCount=8, filterCount=0, writeCount=8 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=8, rollbackCount=0, exitDescription=
19:50:21,841 DEBUG BlockingQueueConsumer:623 - Storing delivery for Consumer: tags=[{amq.ctag-fG60L6zfXE_GHWqNjPQskw=replies}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0
19:50:21,841 DEBUG DataSourceTransactionManager:472 - Participating in existing transaction
19:50:21,841 DEBUG JdbcTemplate:908 - Executing prepared SQL update
19:50:21,841 DEBUG JdbcTemplate:627 - Executing prepared SQL statement [UPDATE BATCH_STEP_EXECUTION set START_TIME = ?, END_TIME = ?, STATUS = ?, COMMIT_COUNT = ?, READ_COUNT = ?, FILTER_COUNT = ?, WRITE_COUNT = ?, EXIT_CODE = ?, EXIT_MESSAGE = ?, VERSION = ?, READ_SKIP_COUNT = ?, PROCESS_SKIP_COUNT = ?, WRITE_SKIP_COUNT = ?, ROLLBACK_COUNT = ?, LAST_UPDATED = ? where STEP_EXECUTION_ID = ? and VERSION = ?]
19:50:21,841 DEBUG BlockingQueueConsumer:339 - Received message: (Body:'ChunkResponse: jobId=2, sequence=7, stepContribution=[StepContribution: read=0, written=1, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], successful=true'MessageProperties [headers={contentType=application/x-java-serialized-object}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=application/x-java-serialized-object, contentEncoding=null, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=master.slave.exchange, receivedRoutingKey=reply, deliveryTag=8, messageCount=0])
19:50:21,857 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[amqp_receivedRoutingKey] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
19:50:21,857 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[amqp_receivedExchange] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
19:50:21,857 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[amqp_deliveryTag] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
19:50:21,857 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[amqp_deliveryMode] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
19:50:21,857 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[amqp_redelivered] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
19:50:21,857 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[contentType] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
19:50:21,857 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[contentType] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
19:50:21,857 DEBUG QueueChannel:278 - preSend on channel 'replies', message: GenericMessage [payload=ChunkResponse: jobId=2, sequence=7, stepContribution=[StepContribution: read=0, written=1, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], successful=true, headers={amqp_receivedRoutingKey=reply, amqp_receivedExchange=master.slave.exchange, amqp_deliveryTag=8, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=replies, amqp_redelivered=false, id=21950631-84a6-10a5-22f8-f5e7bc681371, amqp_consumerTag=amq.ctag-fG60L6zfXE_GHWqNjPQskw, contentType=application/x-java-serialized-object, timestamp=1437583821857}]
19:50:21,857 DEBUG QueueChannel:290 - postSend (sent=true) on channel 'replies', message: GenericMessage [payload=ChunkResponse: jobId=2, sequence=7, stepContribution=[StepContribution: read=0, written=1, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], successful=true, headers={amqp_receivedRoutingKey=reply, amqp_receivedExchange=master.slave.exchange, amqp_deliveryTag=8, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=replies, amqp_redelivered=false, id=21950631-84a6-10a5-22f8-f5e7bc681371, amqp_consumerTag=amq.ctag-fG60L6zfXE_GHWqNjPQskw, contentType=application/x-java-serialized-object, timestamp=1437583821857}]
19:50:21,857 DEBUG BlockingQueueConsumer:365 - Retrieving delivery for Consumer: tags=[{amq.ctag-fG60L6zfXE_GHWqNjPQskw=replies}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0
19:50:21,857 DEBUG JdbcTemplate:918 - SQL update affected 1 rows
19:50:21,857 DEBUG JdbcTemplate:693 - Executing prepared SQL query
19:50:21,857 DEBUG JdbcTemplate:627 - Executing prepared SQL statement [SELECT VERSION FROM BATCH_JOB_EXECUTION WHERE JOB_EXECUTION_ID=?]
19:50:21,857 DEBUG DataSourceTransactionManager:755 - Initiating transaction commit
19:50:21,857 DEBUG DataSourceTransactionManager:269 - Committing JDBC transaction on Connection [com.mchange.v2.c3p0.impl.NewProxyConnection@3d53e876]
19:50:21,873 DEBUG DataSourceTransactionManager:327 - Releasing JDBC Connection [com.mchange.v2.c3p0.impl.NewProxyConnection@3d53e876] after transaction
19:50:21,873 DEBUG DataSourceUtils:327 - Returning JDBC Connection to DataSource
19:50:21,873 DEBUG ThreadPoolAsynchronousRunner:236 - com.mchange.v2.async.ThreadPoolAsynchronousRunner@65a48602: Adding task to queue -- com.mchange.v2.resourcepool.BasicResourcePoolRefurbishCheckinResourceTask@70a24f9
19:50:21,873 DEBUG BasicResourcePool:1747 - trace com.mchange.v2.resourcepool.BasicResourcePool@1f11f64e [managed: 3, unused: 2, excluded: 0] (e.g. com.mchange.v2.c3p0.impl.NewPooledConnection@3b64d775)
19:50:21,873 DEBUG RepeatTemplate:366 - Repeat operation about to start at count=9
19:50:21,873 DEBUG StepContextRepeatCallback:68 - Preparing chunk execution for StepContext: org.springframework.batch.core.scope.context.StepContext@6f8ac8d5
19:50:21,873 DEBUG StepContextRepeatCallback:76 - Chunk execution starting: queue size=0
19:50:21,873 DEBUG DataSourceTransactionManager:367 - Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
19:50:21,873 DEBUG BasicResourcePool:1747 - trace com.mchange.v2.resourcepool.BasicResourcePool@1f11f64e [managed: 3, unused: 2, excluded: 0] (e.g. com.mchange.v2.c3p0.impl.NewPooledConnection@3b64d775)
19:50:21,873 DEBUG DataSourceTransactionManager:206 - Acquired Connection [com.mchange.v2.c3p0.impl.NewProxyConnection@26101efc] for JDBC transaction
19:50:21,873 DEBUG DataSourceTransactionManager:223 - Switching JDBC Connection [com.mchange.v2.c3p0.impl.NewProxyConnection@26101efc] to manual commit
19:50:21,873 DEBUG RepeatTemplate:464 - Starting repeat context.
19:50:21,873 DEBUG RepeatTemplate:366 - Repeat operation about to start at count=1
19:50:21,873 DEBUG RepeatTemplate:437 - Repeat is complete according to policy and result value.
P.S。抱歉我的英语不好
尝试将 mapped-request-headers="*"
添加到通道适配器(主从)。 JMS 默认映射所有头信息; ampq 适配器没有。
如果这没有帮助;打开 DEBUG 日志记录并在两侧的流程中跟踪消息。
如果您无法从中找出答案,post 某个地方的日志。
我正在尝试按照此处的说明实施 Remote Chunking
http://docs.spring.io/spring-batch/trunk/reference/html/springBatchIntegration.html#externalizing-batch-process-execution。
我的简单大师工作:
<job id="job" restartable="true">
<step id="step1">
<tasklet>
<chunk reader="imageDataReader" writer="itemWriter" commit-interval="1"/>
</tasklet>
</step>
</job>
主配置:
<int:channel id="replies">
<int:queue/>
</int:channel>
<beans:bean class="org.springframework.batch.core.scope.StepScope" />
<amqp:outbound-channel-adapter id="masterOutboundAdapter"
exchange-name="master.slave.exchange"
routing-key="request"
amqp-template="rabbitTemplate"/>
<amqp:inbound-channel-adapter id="masterInboundAdapter"
connection-factory="connectionFactory"
channel="replies"
queue-names="replies"
/>
<beans:bean id="messagingTemplate"
class="org.springframework.integration.core.MessagingTemplate">
<beans:property name="defaultChannel" ref="masterOutboundAdapter"/>
<beans:property name="receiveTimeout" value="2000"/>
</beans:bean>
<beans:bean id="itemWriter" scope="step"
class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter">
<beans:property name="messagingOperations" ref="messagingTemplate"/>
<beans:property name="replyChannel" ref="replies"/>
</beans:bean>
<beans:bean id="chunkHandler"
class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean">
<beans:property name="chunkWriter" ref="itemWriter"/>
<beans:property name="step" ref="step1"/>
</beans:bean>
从机配置:
<int:channel id="replies"/>
<int:channel id="requests">
</int:channel>
<amqp:outbound-channel-adapter id="slaveOutboundAdapter"
exchange-name="master.slave.exchange"
routing-key="reply"
channel="replies"
amqp-template="rabbitTemplate"
/>
<amqp:inbound-channel-adapter id="slaveInboundAdapter"
connection-factory="connectionFactory"
channel="requests"
queue-names="requests"
/>
<int:service-activator id="serviceActivator"
input-channel="requests"
output-channel="replies"
ref="chunkProcessorChunkHandler"
requires-reply="true"
method="handleChunk"
/>
<bean id="chunkProcessorChunkHandler"
class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler">
<property name="chunkProcessor">
<bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
<property name="itemWriter">
<bean class="xx.yy.ImageDataWriter"/>
</property>
<property name="itemProcessor">
<bean class="xx.yy.ImageDataProcessor"/>
</property>
</bean>
</property>
</bean>
Slave chunk reveice massages,但没有回复并且作业停留在 STARTED 状态
感谢提前。
更新 1
我已添加,mapped-request-headers="*"
到通道适配器和从块回复主块,但主块仍未将作业状态更新为已完成
Slave writer activated, writing:Hello, slave
2015-07-22 19:46:22 DEBUG ChunkProcessorChunkHandler:88 - Completed chunk handling with [StepContribution: read=0, written=1, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING]
2015-07-22 19:46:22 DEBUG DirectChannel:278 - preSend on channel 'replies', message: GenericMessage [payload=ChunkResponse: jobId=1, sequence=491, stepContribution=[StepContribution: read=0, written=1, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], successful=true, headers={amqp_receivedRoutingKey=request, amqp_receivedExchange=master.slave.exchange, amqp_deliveryTag=492, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=requests, amqp_redelivered=false, id=c2ff7d5b-a8dc-88ba-43fa-d6e9cd068ba5, amqp_consumerTag=amq.ctag-YDnMhHBntWQttsQ-0ILlNQ, contentType=application/x-java-serialized-object, timestamp=1437583582347}]
2015-07-22 19:46:22 DEBUG AmqpOutboundEndpoint:72 - org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0 received message: GenericMessage [payload=ChunkResponse: jobId=1, sequence=491, stepContribution=[StepContribution: read=0, written=1, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], successful=true, headers={amqp_receivedRoutingKey=request, amqp_receivedExchange=master.slave.exchange, amqp_deliveryTag=492, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=requests, amqp_redelivered=false, id=c2ff7d5b-a8dc-88ba-43fa-d6e9cd068ba5, amqp_consumerTag=amq.ctag-YDnMhHBntWQttsQ-0ILlNQ, contentType=application/x-java-serialized-object, timestamp=1437583582347}]
2015-07-22 19:46:22 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[amqp_receivedRoutingKey] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
2015-07-22 19:46:22 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[amqp_receivedExchange] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
2015-07-22 19:46:22 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[amqp_deliveryTag] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
2015-07-22 19:46:22 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[amqp_deliveryMode] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
2015-07-22 19:46:22 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[amqp_redelivered] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
2015-07-22 19:46:22 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[contentType] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
2015-07-22 19:46:22 DEBUG RabbitTemplate:1043 - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2)
2015-07-22 19:46:22 DEBUG RabbitTemplate:1071 - Publishing message on exchange [master.slave.exchange], routingKey = [reply]
2015-07-22 19:46:22 DEBUG AmqpOutboundEndpoint:112 - handler 'org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0' produced no reply for request Message: GenericMessage [payload=ChunkResponse: jobId=1, sequence=491, stepContribution=[StepContribution: read=0, written=1, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], successful=true, headers={amqp_receivedRoutingKey=request, amqp_receivedExchange=master.slave.exchange, amqp_deliveryTag=492, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=requests, amqp_redelivered=false, id=c2ff7d5b-a8dc-88ba-43fa-d6e9cd068ba5, amqp_consumerTag=amq.ctag-YDnMhHBntWQttsQ-0ILlNQ, contentType=application/x-java-serialized-object, timestamp=1437583582347}]
精通堆栈跟踪
19:50:21,826 DEBUG ThreadPoolAsynchronousRunner:730 - com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector@32e1664a -- Running DeadlockDetector[Exiting. No pending tasks.]
19:50:21,826 DEBUG BlockingQueueConsumer:365 - Retrieving delivery for Consumer: tags=[{amq.ctag-fG60L6zfXE_GHWqNjPQskw=replies}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0
19:50:21,826 DEBUG QueueChannel:76 - postReceive on channel 'replies', message: GenericMessage [payload=ChunkResponse: jobId=2, sequence=0, stepContribution=[StepContribution: read=0, written=1, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], successful=true, headers={amqp_receivedRoutingKey=reply, amqp_receivedExchange=master.slave.exchange, amqp_deliveryTag=1, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=replies, amqp_redelivered=false, id=ede955c7-6708-42e9-a7f2-b002ed8c0fc8, amqp_consumerTag=amq.ctag-fG60L6zfXE_GHWqNjPQskw, contentType=application/x-java-serialized-object, timestamp=1437583781577}]
19:50:21,826 DEBUG ChunkMessageChannelItemWriter:224 - Found result: ChunkResponse: jobId=2, sequence=0, stepContribution=[StepContribution: read=0, written=1, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], successful=true
19:50:21,826 DEBUG ChunkMessageChannelItemWriter:105 - Dispatching chunk: ChunkRequest: jobId=2, sequence=7, contribution=[StepContribution: read=0, written=0, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], item count=1
19:50:21,826 DEBUG DirectChannel:278 - preSend on channel 'masterOutboundAdapter', message: GenericMessage [payload=ChunkRequest: jobId=2, sequence=7, contribution=[StepContribution: read=0, written=0, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], item count=1, headers={id=44e5f7bc-a00f-fb81-a68d-84a44e27c0ac, timestamp=1437583821826}]
19:50:21,826 DEBUG AmqpOutboundEndpoint:72 - org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0 received message: GenericMessage [payload=ChunkRequest: jobId=2, sequence=7, contribution=[StepContribution: read=0, written=0, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], item count=1, headers={id=44e5f7bc-a00f-fb81-a68d-84a44e27c0ac, timestamp=1437583821826}]
19:50:21,826 DEBUG RabbitTemplate:1043 - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2)
19:50:21,826 DEBUG RabbitTemplate:1071 - Publishing message on exchange [master.slave.exchange], routingKey = [request]
19:50:21,826 DEBUG AmqpOutboundEndpoint:112 - handler 'org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0' produced no reply for request Message: GenericMessage [payload=ChunkRequest: jobId=2, sequence=7, contribution=[StepContribution: read=0, written=0, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], item count=1, headers={id=44e5f7bc-a00f-fb81-a68d-84a44e27c0ac, timestamp=1437583821826}]
19:50:21,826 DEBUG DirectChannel:290 - postSend (sent=true) on channel 'masterOutboundAdapter', message: GenericMessage [payload=ChunkRequest: jobId=2, sequence=7, contribution=[StepContribution: read=0, written=0, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], item count=1, headers={id=44e5f7bc-a00f-fb81-a68d-84a44e27c0ac, timestamp=1437583821826}]
19:50:21,826 DEBUG ChunkOrientedTasklet:88 - Inputs not busy, ended: false
19:50:21,826 DEBUG TaskletStep:437 - Applying contribution: [StepContribution: read=1, written=1, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING]
19:50:21,826 DEBUG DataSourceTransactionManager:472 - Participating in existing transaction
19:50:21,841 DEBUG JdbcTemplate:908 - Executing prepared SQL update
19:50:21,841 DEBUG JdbcTemplate:627 - Executing prepared SQL statement [UPDATE BATCH_STEP_EXECUTION_CONTEXT SET SHORT_CONTEXT = ?, SERIALIZED_CONTEXT = ? WHERE STEP_EXECUTION_ID = ?]
19:50:21,841 DEBUG JdbcTemplate:918 - SQL update affected 1 rows
19:50:21,841 DEBUG TaskletStep:451 - Saving step execution before commit: StepExecution: id=2, version=8, name=step1, status=STARTED, exitStatus=EXECUTING, readCount=8, filterCount=0, writeCount=8 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=8, rollbackCount=0, exitDescription=
19:50:21,841 DEBUG BlockingQueueConsumer:623 - Storing delivery for Consumer: tags=[{amq.ctag-fG60L6zfXE_GHWqNjPQskw=replies}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0
19:50:21,841 DEBUG DataSourceTransactionManager:472 - Participating in existing transaction
19:50:21,841 DEBUG JdbcTemplate:908 - Executing prepared SQL update
19:50:21,841 DEBUG JdbcTemplate:627 - Executing prepared SQL statement [UPDATE BATCH_STEP_EXECUTION set START_TIME = ?, END_TIME = ?, STATUS = ?, COMMIT_COUNT = ?, READ_COUNT = ?, FILTER_COUNT = ?, WRITE_COUNT = ?, EXIT_CODE = ?, EXIT_MESSAGE = ?, VERSION = ?, READ_SKIP_COUNT = ?, PROCESS_SKIP_COUNT = ?, WRITE_SKIP_COUNT = ?, ROLLBACK_COUNT = ?, LAST_UPDATED = ? where STEP_EXECUTION_ID = ? and VERSION = ?]
19:50:21,841 DEBUG BlockingQueueConsumer:339 - Received message: (Body:'ChunkResponse: jobId=2, sequence=7, stepContribution=[StepContribution: read=0, written=1, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], successful=true'MessageProperties [headers={contentType=application/x-java-serialized-object}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=application/x-java-serialized-object, contentEncoding=null, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=master.slave.exchange, receivedRoutingKey=reply, deliveryTag=8, messageCount=0])
19:50:21,857 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[amqp_receivedRoutingKey] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
19:50:21,857 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[amqp_receivedExchange] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
19:50:21,857 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[amqp_deliveryTag] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
19:50:21,857 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[amqp_deliveryMode] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
19:50:21,857 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[amqp_redelivered] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
19:50:21,857 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[contentType] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
19:50:21,857 DEBUG AbstractHeaderMapper$HeaderMatcher:374 - headerName=[contentType] WILL be mapped, found in [amqp_appId, amqp_clusterId, amqp_contentEncoding, amqp_contentLength, contentType, amqp_correlationId, amqp_deliveryMode, amqp_deliveryTag, amqp_expiration, amqp_messageCount, amqp_messageId, amqp_receivedExchange, amqp_receivedRoutingKey, amqp_redelivered, amqp_replyTo, amqp_timestamp, amqp_type, amqp_userId, json__TypeId__, json__ContentTypeId__, json__KeyTypeId__, amqp_springReplyCorrelation, amqp_springReplyToStack]
19:50:21,857 DEBUG QueueChannel:278 - preSend on channel 'replies', message: GenericMessage [payload=ChunkResponse: jobId=2, sequence=7, stepContribution=[StepContribution: read=0, written=1, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], successful=true, headers={amqp_receivedRoutingKey=reply, amqp_receivedExchange=master.slave.exchange, amqp_deliveryTag=8, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=replies, amqp_redelivered=false, id=21950631-84a6-10a5-22f8-f5e7bc681371, amqp_consumerTag=amq.ctag-fG60L6zfXE_GHWqNjPQskw, contentType=application/x-java-serialized-object, timestamp=1437583821857}]
19:50:21,857 DEBUG QueueChannel:290 - postSend (sent=true) on channel 'replies', message: GenericMessage [payload=ChunkResponse: jobId=2, sequence=7, stepContribution=[StepContribution: read=0, written=1, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], successful=true, headers={amqp_receivedRoutingKey=reply, amqp_receivedExchange=master.slave.exchange, amqp_deliveryTag=8, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=replies, amqp_redelivered=false, id=21950631-84a6-10a5-22f8-f5e7bc681371, amqp_consumerTag=amq.ctag-fG60L6zfXE_GHWqNjPQskw, contentType=application/x-java-serialized-object, timestamp=1437583821857}]
19:50:21,857 DEBUG BlockingQueueConsumer:365 - Retrieving delivery for Consumer: tags=[{amq.ctag-fG60L6zfXE_GHWqNjPQskw=replies}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0
19:50:21,857 DEBUG JdbcTemplate:918 - SQL update affected 1 rows
19:50:21,857 DEBUG JdbcTemplate:693 - Executing prepared SQL query
19:50:21,857 DEBUG JdbcTemplate:627 - Executing prepared SQL statement [SELECT VERSION FROM BATCH_JOB_EXECUTION WHERE JOB_EXECUTION_ID=?]
19:50:21,857 DEBUG DataSourceTransactionManager:755 - Initiating transaction commit
19:50:21,857 DEBUG DataSourceTransactionManager:269 - Committing JDBC transaction on Connection [com.mchange.v2.c3p0.impl.NewProxyConnection@3d53e876]
19:50:21,873 DEBUG DataSourceTransactionManager:327 - Releasing JDBC Connection [com.mchange.v2.c3p0.impl.NewProxyConnection@3d53e876] after transaction
19:50:21,873 DEBUG DataSourceUtils:327 - Returning JDBC Connection to DataSource
19:50:21,873 DEBUG ThreadPoolAsynchronousRunner:236 - com.mchange.v2.async.ThreadPoolAsynchronousRunner@65a48602: Adding task to queue -- com.mchange.v2.resourcepool.BasicResourcePoolRefurbishCheckinResourceTask@70a24f9
19:50:21,873 DEBUG BasicResourcePool:1747 - trace com.mchange.v2.resourcepool.BasicResourcePool@1f11f64e [managed: 3, unused: 2, excluded: 0] (e.g. com.mchange.v2.c3p0.impl.NewPooledConnection@3b64d775)
19:50:21,873 DEBUG RepeatTemplate:366 - Repeat operation about to start at count=9
19:50:21,873 DEBUG StepContextRepeatCallback:68 - Preparing chunk execution for StepContext: org.springframework.batch.core.scope.context.StepContext@6f8ac8d5
19:50:21,873 DEBUG StepContextRepeatCallback:76 - Chunk execution starting: queue size=0
19:50:21,873 DEBUG DataSourceTransactionManager:367 - Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
19:50:21,873 DEBUG BasicResourcePool:1747 - trace com.mchange.v2.resourcepool.BasicResourcePool@1f11f64e [managed: 3, unused: 2, excluded: 0] (e.g. com.mchange.v2.c3p0.impl.NewPooledConnection@3b64d775)
19:50:21,873 DEBUG DataSourceTransactionManager:206 - Acquired Connection [com.mchange.v2.c3p0.impl.NewProxyConnection@26101efc] for JDBC transaction
19:50:21,873 DEBUG DataSourceTransactionManager:223 - Switching JDBC Connection [com.mchange.v2.c3p0.impl.NewProxyConnection@26101efc] to manual commit
19:50:21,873 DEBUG RepeatTemplate:464 - Starting repeat context.
19:50:21,873 DEBUG RepeatTemplate:366 - Repeat operation about to start at count=1
19:50:21,873 DEBUG RepeatTemplate:437 - Repeat is complete according to policy and result value.
P.S。抱歉我的英语不好
尝试将 mapped-request-headers="*"
添加到通道适配器(主从)。 JMS 默认映射所有头信息; ampq 适配器没有。
如果这没有帮助;打开 DEBUG 日志记录并在两侧的流程中跟踪消息。
如果您无法从中找出答案,post 某个地方的日志。