为什么客户端收到 500 响应错误,尽管服务器在 spring-integration-http 中没有遇到任何错误?
Why client gets 500 response error although server doesn't experience any errors in spring-integration-http?
我有以下服务器配置:
@Configuration
@EnableIntegration
public class Config {
@Bean
public IntegrationFlow integrationFlow() {
return IntegrationFlows.from(Http.inboundGateway("/spring_integration_post")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(String.class))
.enrich(enricherSpec -> {
enricherSpec.header("correlationId", 1); //ackCorrelationId
})
.split(s -> s.applySequence(false).get().getT2().setDelimiters(","))
.log()
//.barrier(1000L)
.log()
.handle(Amqp.outboundAdapter(amqpTemplate())
.exchangeName("barrierExchange")
.routingKey("barrierKey")
.confirmAckChannel(confirmAckChannel())
.confirmCorrelationExpression("payload")
)
.get();
}
@Bean
public CachingConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost("localhost");
cachingConnectionFactory.setUsername("guest");
cachingConnectionFactory.setPassword("guest");
cachingConnectionFactory.setPublisherConfirms(true);
cachingConnectionFactory.setPublisherReturns(true);
return cachingConnectionFactory;
}
@Bean
public AmqpTemplate amqpTemplate() {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory());
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}
@Bean
public DirectChannel confirmAckChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow ackChannelListener() {
return IntegrationFlows.from(confirmAckChannel())
.handle(m -> {
System.out.println("ACK:" + m);
})
.get();
}
}
和以下客户端配置:
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class Config {
@Bean
public IntegrationFlow integrationFlow() {
return IntegrationFlows.from(consoleSource(), consoleConsumer())
.handle(httpOutboundGateway())
.log()
.channel("httpRequestChannel")
.handle(s -> {
System.out.println("We got response: " + s);
})
.get();
}
private HttpMessageHandlerSpec httpOutboundGateway() {
return Http.outboundGateway("http://localhost:8080/spring_integration_post") //http://localhost:8080/my_post
.httpMethod(HttpMethod.POST)
.expectedResponseType(String.class);
}
private Consumer<SourcePollingChannelAdapterSpec> consoleConsumer() {
return c -> c.poller(Pollers.fixedRate(1000)
.maxMessagesPerPoll(1));
}
public MessageSource<String> consoleSource() {
return CharacterStreamReadingMessageSource.stdin();
}
}
我从客户端发送a,v,b
我看到 服务器 接受该消息,将 3 条消息发送到 rabbitMq(通过 rabbit admin 我看到消息确实被 rabbit 接受)并得到 3 次确认:
ACK:GenericMessage [payload=a, headers={amqp_publishConfirm=true, id=eb8fd94b-5721-8b3b-5219-b7a4e0d950a8, timestamp=1567062603387}]
ACK:GenericMessage [payload=v, headers={amqp_publishConfirm=true, id=724b7def-1ab2-79b9-c788-27f6cbe24a33, timestamp=1567062603388}]
ACK:GenericMessage [payload=b, headers={amqp_publishConfirm=true, id=12a10799-d664-64dc-dd9e-1601fda61977, timestamp=1567062603389}]
但是 client 打印以下跟踪:
2019-08-29 10:10:04.392 ERROR 18404 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: HTTP request execution failed for URI [http://localhost:8080/spring_integration_post]; nested exception is org.springframework.web.client.HttpServerErrorException$InternalServerError: 500 null, failedMessage=GenericMessage [payload=a,v,b, headers={id=0b902e6f-d4de-329f-9916-19f94cdedc4e, timestamp=1567062603152}]
at org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler.exchange(HttpRequestExecutingMessageHandler.java:171)
at org.springframework.integration.http.outbound.AbstractHttpRequestExecutingMessageHandler.handleRequestMessage(AbstractHttpRequestExecutingMessageHandler.java:289)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:234)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:390)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null(AbstractPollingEndpoint.java:277)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=14=](ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller(AbstractPollingEndpoint.java:274)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.web.client.HttpServerErrorException$InternalServerError: 500 null
at org.springframework.web.client.HttpServerErrorException.create(HttpServerErrorException.java:79)
at org.springframework.web.client.DefaultResponseErrorHandler.handleError(DefaultResponseErrorHandler.java:124)
at org.springframework.web.client.DefaultResponseErrorHandler.handleError(DefaultResponseErrorHandler.java:102)
at org.springframework.web.client.ResponseErrorHandler.handleError(ResponseErrorHandler.java:63)
at org.springframework.web.client.RestTemplate.handleResponse(RestTemplate.java:778)
at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:736)
at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:710)
at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:598)
at org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler.exchange(HttpRequestExecutingMessageHandler.java:165)
... 30 more
为什么会这样?
我该如何解决?
P.S.
正如@Artem Bilan 所说,我的问题是我没有对客户端做出任何响应,因此客户端遇到了超时错误。我想说错误真的很混乱。我希望 504 错误 fir 超时。
在所有消息都发送到 rabbit 后,我试图 return 给客户端一些东西,所以我写了以下配置:
@Bean
public IntegrationFlow integrationFlow() {
return IntegrationFlows.from(Http.inboundGateway("/spring_integration_post")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(String.class))
.split(s -> s.applySequence(false).get().getT2().setDelimiters(","))
.log()
.handle(Amqp.outboundAdapter(amqpTemplate())
.exchangeName("barrierExchange")
.routingKey("barrierKey")
.confirmAckChannel(confirmAckChannel())
.confirmCorrelationFunction(Message::getPayload)
).handle((payload, headers) -> {
System.out.println("Before aggregation");
return true;
})
.aggregate()
.handle((payload, headers) -> {
System.out.println("After aggregation");
return true;
}).get();
但是从客户端我看到了相同的堆栈跟踪。同样从服务器端我看到了那个痕迹:
2019-08-30 12:19:10.166 INFO 12224 --- [nio-8080-exec-3] o.s.integration.handler.LoggingHandler : GenericMessage [payload=A, headers={content-length=5, http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, accept=[text/plain, application/json, application/*+json, */*], accept-charset=[Big5, Big5-HKSCS, CESU-8, EUC-JP, EUC-KR, GB18030, GB2312, GBK, IBM-Thai, IBM00858, IBM01140, IBM01141, IBM01142, IBM01143, IBM01144, IBM01145, IBM01146, IBM01147, IBM01148, IBM01149, IBM037, IBM1026, IBM1047, IBM273, IBM277, IBM278, IBM280, IBM284, IBM285, IBM290, IBM297, IBM420, IBM424, IBM437, IBM500, IBM775, IBM850, IBM852, IBM855, IBM857, IBM860, IBM861, IBM862, IBM863, IBM864, IBM865, IBM866, IBM868, IBM869, IBM870, IBM871, IBM918, ISO-2022-CN, ISO-2022-JP, ISO-2022-JP-2, ISO-2022-KR, ISO-8859-1, ISO-8859-13, ISO-8859-15, ISO-8859-16, ISO-8859-2, ISO-8859-3, ISO-8859-4, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, JIS_X0201, JIS_X0212-1990, KOI8-R, KOI8-U, Shift_JIS, TIS-620, US-ASCII, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE, UTF-8, windows-1250, windows-1251, windows-1252, windows-1253, windows-1254, windows-1255, windows-1256, windows-1257, windows-1258, windows-31j, x-Big5-HKSCS-2001, x-Big5-Solaris, x-euc-jp-linux, x-EUC-TW, x-eucJP-Open, x-IBM1006, x-IBM1025, x-IBM1046, x-IBM1097, x-IBM1098, x-IBM1112, x-IBM1122, x-IBM1123, x-IBM1124, x-IBM1166, x-IBM1364, x-IBM1381, x-IBM1383, x-IBM300, x-IBM33722, x-IBM737, x-IBM833, x-IBM834, x-IBM856, x-IBM874, x-IBM875, x-IBM921, x-IBM922, x-IBM930, x-IBM933, x-IBM935, x-IBM937, x-IBM939, x-IBM942, x-IBM942C, x-IBM943, x-IBM943C, x-IBM948, x-IBM949, x-IBM949C, x-IBM950, x-IBM964, x-IBM970, x-ISCII91, x-ISO-2022-CN-CNS, x-ISO-2022-CN-GB, x-iso-8859-11, x-JIS0208, x-JISAutoDetect, x-Johab, x-MacArabic, x-MacCentralEurope, x-MacCroatian, x-MacCyrillic, x-MacDingbat, x-MacGreek, x-MacHebrew, x-MacIceland, x-MacRoman, x-MacRomania, x-MacSymbol, x-MacThai, x-MacTurkish, x-MacUkraine, x-MS932_0213, x-MS950-HKSCS, x-MS950-HKSCS-XP, x-mswin-936, x-PCK, x-SJIS_0213, x-UTF-16LE-BOM, X-UTF-32BE-BOM, X-UTF-32LE-BOM, x-windows-50220, x-windows-50221, x-windows-874, x-windows-949, x-windows-950, x-windows-iso2022jp], replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, host=localhost:8080, http_requestUrl=http://localhost:8080/spring_integration_post, connection=keep-alive, id=bb52003e-0731-d64c-1285-e28adb93c87a, contentType=text/plain;charset=UTF-8, user-agent=Java/11.0.3, timestamp=1567156750166}]
2019-08-30 12:19:10.167 INFO 12224 --- [nio-8080-exec-3] o.s.integration.handler.LoggingHandler : GenericMessage [payload=A, headers={content-length=5, http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, accept=[text/plain, application/json, application/*+json, */*], accept-charset=[Big5, Big5-HKSCS, CESU-8, EUC-JP, EUC-KR, GB18030, GB2312, GBK, IBM-Thai, IBM00858, IBM01140, IBM01141, IBM01142, IBM01143, IBM01144, IBM01145, IBM01146, IBM01147, IBM01148, IBM01149, IBM037, IBM1026, IBM1047, IBM273, IBM277, IBM278, IBM280, IBM284, IBM285, IBM290, IBM297, IBM420, IBM424, IBM437, IBM500, IBM775, IBM850, IBM852, IBM855, IBM857, IBM860, IBM861, IBM862, IBM863, IBM864, IBM865, IBM866, IBM868, IBM869, IBM870, IBM871, IBM918, ISO-2022-CN, ISO-2022-JP, ISO-2022-JP-2, ISO-2022-KR, ISO-8859-1, ISO-8859-13, ISO-8859-15, ISO-8859-16, ISO-8859-2, ISO-8859-3, ISO-8859-4, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, JIS_X0201, JIS_X0212-1990, KOI8-R, KOI8-U, Shift_JIS, TIS-620, US-ASCII, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE, UTF-8, windows-1250, windows-1251, windows-1252, windows-1253, windows-1254, windows-1255, windows-1256, windows-1257, windows-1258, windows-31j, x-Big5-HKSCS-2001, x-Big5-Solaris, x-euc-jp-linux, x-EUC-TW, x-eucJP-Open, x-IBM1006, x-IBM1025, x-IBM1046, x-IBM1097, x-IBM1098, x-IBM1112, x-IBM1122, x-IBM1123, x-IBM1124, x-IBM1166, x-IBM1364, x-IBM1381, x-IBM1383, x-IBM300, x-IBM33722, x-IBM737, x-IBM833, x-IBM834, x-IBM856, x-IBM874, x-IBM875, x-IBM921, x-IBM922, x-IBM930, x-IBM933, x-IBM935, x-IBM937, x-IBM939, x-IBM942, x-IBM942C, x-IBM943, x-IBM943C, x-IBM948, x-IBM949, x-IBM949C, x-IBM950, x-IBM964, x-IBM970, x-ISCII91, x-ISO-2022-CN-CNS, x-ISO-2022-CN-GB, x-iso-8859-11, x-JIS0208, x-JISAutoDetect, x-Johab, x-MacArabic, x-MacCentralEurope, x-MacCroatian, x-MacCyrillic, x-MacDingbat, x-MacGreek, x-MacHebrew, x-MacIceland, x-MacRoman, x-MacRomania, x-MacSymbol, x-MacThai, x-MacTurkish, x-MacUkraine, x-MS932_0213, x-MS950-HKSCS, x-MS950-HKSCS-XP, x-mswin-936, x-PCK, x-SJIS_0213, x-UTF-16LE-BOM, X-UTF-32BE-BOM, X-UTF-32LE-BOM, x-windows-50220, x-windows-50221, x-windows-874, x-windows-949, x-windows-950, x-windows-iso2022jp], replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, host=localhost:8080, http_requestUrl=http://localhost:8080/spring_integration_post, connection=keep-alive, id=bb52003e-0731-d64c-1285-e28adb93c87a, contentType=text/plain;charset=UTF-8, user-agent=Java/11.0.3, timestamp=1567156750166}]
2019-08-30 12:19:10.172 INFO 12224 --- [nio-8080-exec-3] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: localhost:5672
2019-08-30 12:19:10.198 INFO 12224 --- [nio-8080-exec-3] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#123d7057:0/SimpleConnection@2ae574e5 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 63045]
2019-08-30 12:19:10.224 INFO 12224 --- [nio-8080-exec-3] o.s.integration.handler.LoggingHandler : GenericMessage [payload=B, headers={content-length=5, http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, accept=[text/plain, application/json, application/*+json, */*], accept-charset=[Big5, Big5-HKSCS, CESU-8, EUC-JP, EUC-KR, GB18030, GB2312, GBK, IBM-Thai, IBM00858, IBM01140, IBM01141, IBM01142, IBM01143, IBM01144, IBM01145, IBM01146, IBM01147, IBM01148, IBM01149, IBM037, IBM1026, IBM1047, IBM273, IBM277, IBM278, IBM280, IBM284, IBM285, IBM290, IBM297, IBM420, IBM424, IBM437, IBM500, IBM775, IBM850, IBM852, IBM855, IBM857, IBM860, IBM861, IBM862, IBM863, IBM864, IBM865, IBM866, IBM868, IBM869, IBM870, IBM871, IBM918, ISO-2022-CN, ISO-2022-JP, ISO-2022-JP-2, ISO-2022-KR, ISO-8859-1, ISO-8859-13, ISO-8859-15, ISO-8859-16, ISO-8859-2, ISO-8859-3, ISO-8859-4, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, JIS_X0201, JIS_X0212-1990, KOI8-R, KOI8-U, Shift_JIS, TIS-620, US-ASCII, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE, UTF-8, windows-1250, windows-1251, windows-1252, windows-1253, windows-1254, windows-1255, windows-1256, windows-1257, windows-1258, windows-31j, x-Big5-HKSCS-2001, x-Big5-Solaris, x-euc-jp-linux, x-EUC-TW, x-eucJP-Open, x-IBM1006, x-IBM1025, x-IBM1046, x-IBM1097, x-IBM1098, x-IBM1112, x-IBM1122, x-IBM1123, x-IBM1124, x-IBM1166, x-IBM1364, x-IBM1381, x-IBM1383, x-IBM300, x-IBM33722, x-IBM737, x-IBM833, x-IBM834, x-IBM856, x-IBM874, x-IBM875, x-IBM921, x-IBM922, x-IBM930, x-IBM933, x-IBM935, x-IBM937, x-IBM939, x-IBM942, x-IBM942C, x-IBM943, x-IBM943C, x-IBM948, x-IBM949, x-IBM949C, x-IBM950, x-IBM964, x-IBM970, x-ISCII91, x-ISO-2022-CN-CNS, x-ISO-2022-CN-GB, x-iso-8859-11, x-JIS0208, x-JISAutoDetect, x-Johab, x-MacArabic, x-MacCentralEurope, x-MacCroatian, x-MacCyrillic, x-MacDingbat, x-MacGreek, x-MacHebrew, x-MacIceland, x-MacRoman, x-MacRomania, x-MacSymbol, x-MacThai, x-MacTurkish, x-MacUkraine, x-MS932_0213, x-MS950-HKSCS, x-MS950-HKSCS-XP, x-mswin-936, x-PCK, x-SJIS_0213, x-UTF-16LE-BOM, X-UTF-32BE-BOM, X-UTF-32LE-BOM, x-windows-50220, x-windows-50221, x-windows-874, x-windows-949, x-windows-950, x-windows-iso2022jp], replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, host=localhost:8080, http_requestUrl=http://localhost:8080/spring_integration_post, connection=keep-alive, id=b4b5415d-4ad9-167b-2bea-905ec1cc66cf, contentType=text/plain;charset=UTF-8, user-agent=Java/11.0.3, timestamp=1567156750224}]
2019-08-30 12:19:10.225 INFO 12224 --- [nio-8080-exec-3] o.s.integration.handler.LoggingHandler : GenericMessage [payload=B, headers={content-length=5, http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, accept=[text/plain, application/json, application/*+json, */*], accept-charset=[Big5, Big5-HKSCS, CESU-8, EUC-JP, EUC-KR, GB18030, GB2312, GBK, IBM-Thai, IBM00858, IBM01140, IBM01141, IBM01142, IBM01143, IBM01144, IBM01145, IBM01146, IBM01147, IBM01148, IBM01149, IBM037, IBM1026, IBM1047, IBM273, IBM277, IBM278, IBM280, IBM284, IBM285, IBM290, IBM297, IBM420, IBM424, IBM437, IBM500, IBM775, IBM850, IBM852, IBM855, IBM857, IBM860, IBM861, IBM862, IBM863, IBM864, IBM865, IBM866, IBM868, IBM869, IBM870, IBM871, IBM918, ISO-2022-CN, ISO-2022-JP, ISO-2022-JP-2, ISO-2022-KR, ISO-8859-1, ISO-8859-13, ISO-8859-15, ISO-8859-16, ISO-8859-2, ISO-8859-3, ISO-8859-4, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, JIS_X0201, JIS_X0212-1990, KOI8-R, KOI8-U, Shift_JIS, TIS-620, US-ASCII, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE, UTF-8, windows-1250, windows-1251, windows-1252, windows-1253, windows-1254, windows-1255, windows-1256, windows-1257, windows-1258, windows-31j, x-Big5-HKSCS-2001, x-Big5-Solaris, x-euc-jp-linux, x-EUC-TW, x-eucJP-Open, x-IBM1006, x-IBM1025, x-IBM1046, x-IBM1097, x-IBM1098, x-IBM1112, x-IBM1122, x-IBM1123, x-IBM1124, x-IBM1166, x-IBM1364, x-IBM1381, x-IBM1383, x-IBM300, x-IBM33722, x-IBM737, x-IBM833, x-IBM834, x-IBM856, x-IBM874, x-IBM875, x-IBM921, x-IBM922, x-IBM930, x-IBM933, x-IBM935, x-IBM937, x-IBM939, x-IBM942, x-IBM942C, x-IBM943, x-IBM943C, x-IBM948, x-IBM949, x-IBM949C, x-IBM950, x-IBM964, x-IBM970, x-ISCII91, x-ISO-2022-CN-CNS, x-ISO-2022-CN-GB, x-iso-8859-11, x-JIS0208, x-JISAutoDetect, x-Johab, x-MacArabic, x-MacCentralEurope, x-MacCroatian, x-MacCyrillic, x-MacDingbat, x-MacGreek, x-MacHebrew, x-MacIceland, x-MacRoman, x-MacRomania, x-MacSymbol, x-MacThai, x-MacTurkish, x-MacUkraine, x-MS932_0213, x-MS950-HKSCS, x-MS950-HKSCS-XP, x-mswin-936, x-PCK, x-SJIS_0213, x-UTF-16LE-BOM, X-UTF-32BE-BOM, X-UTF-32LE-BOM, x-windows-50220, x-windows-50221, x-windows-874, x-windows-949, x-windows-950, x-windows-iso2022jp], replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, host=localhost:8080, http_requestUrl=http://localhost:8080/spring_integration_post, connection=keep-alive, id=b4b5415d-4ad9-167b-2bea-905ec1cc66cf, contentType=text/plain;charset=UTF-8, user-agent=Java/11.0.3, timestamp=1567156750224}]
ACK:GenericMessage [payload=A, headers={amqp_publishConfirm=true, id=5d85f459-661d-69c4-4d36-b5843289b41b, timestamp=1567156750227}]
2019-08-30 12:19:10.227 INFO 12224 --- [nio-8080-exec-3] o.s.integration.handler.LoggingHandler : GenericMessage [payload=C, headers={content-length=5, http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, accept=[text/plain, application/json, application/*+json, */*], accept-charset=[Big5, Big5-HKSCS, CESU-8, EUC-JP, EUC-KR, GB18030, GB2312, GBK, IBM-Thai, IBM00858, IBM01140, IBM01141, IBM01142, IBM01143, IBM01144, IBM01145, IBM01146, IBM01147, IBM01148, IBM01149, IBM037, IBM1026, IBM1047, IBM273, IBM277, IBM278, IBM280, IBM284, IBM285, IBM290, IBM297, IBM420, IBM424, IBM437, IBM500, IBM775, IBM850, IBM852, IBM855, IBM857, IBM860, IBM861, IBM862, IBM863, IBM864, IBM865, IBM866, IBM868, IBM869, IBM870, IBM871, IBM918, ISO-2022-CN, ISO-2022-JP, ISO-2022-JP-2, ISO-2022-KR, ISO-8859-1, ISO-8859-13, ISO-8859-15, ISO-8859-16, ISO-8859-2, ISO-8859-3, ISO-8859-4, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, JIS_X0201, JIS_X0212-1990, KOI8-R, KOI8-U, Shift_JIS, TIS-620, US-ASCII, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE, UTF-8, windows-1250, windows-1251, windows-1252, windows-1253, windows-1254, windows-1255, windows-1256, windows-1257, windows-1258, windows-31j, x-Big5-HKSCS-2001, x-Big5-Solaris, x-euc-jp-linux, x-EUC-TW, x-eucJP-Open, x-IBM1006, x-IBM1025, x-IBM1046, x-IBM1097, x-IBM1098, x-IBM1112, x-IBM1122, x-IBM1123, x-IBM1124, x-IBM1166, x-IBM1364, x-IBM1381, x-IBM1383, x-IBM300, x-IBM33722, x-IBM737, x-IBM833, x-IBM834, x-IBM856, x-IBM874, x-IBM875, x-IBM921, x-IBM922, x-IBM930, x-IBM933, x-IBM935, x-IBM937, x-IBM939, x-IBM942, x-IBM942C, x-IBM943, x-IBM943C, x-IBM948, x-IBM949, x-IBM949C, x-IBM950, x-IBM964, x-IBM970, x-ISCII91, x-ISO-2022-CN-CNS, x-ISO-2022-CN-GB, x-iso-8859-11, x-JIS0208, x-JISAutoDetect, x-Johab, x-MacArabic, x-MacCentralEurope, x-MacCroatian, x-MacCyrillic, x-MacDingbat, x-MacGreek, x-MacHebrew, x-MacIceland, x-MacRoman, x-MacRomania, x-MacSymbol, x-MacThai, x-MacTurkish, x-MacUkraine, x-MS932_0213, x-MS950-HKSCS, x-MS950-HKSCS-XP, x-mswin-936, x-PCK, x-SJIS_0213, x-UTF-16LE-BOM, X-UTF-32BE-BOM, X-UTF-32LE-BOM, x-windows-50220, x-windows-50221, x-windows-874, x-windows-949, x-windows-950, x-windows-iso2022jp], replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, host=localhost:8080, http_requestUrl=http://localhost:8080/spring_integration_post, connection=keep-alive, id=a4a4838b-849f-35dd-b451-a38b5a5edb13, contentType=text/plain;charset=UTF-8, user-agent=Java/11.0.3, timestamp=1567156750227}]
2019-08-30 12:19:10.228 INFO 12224 --- [nio-8080-exec-3] o.s.integration.handler.LoggingHandler : GenericMessage [payload=C, headers={content-length=5, http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, accept=[text/plain, application/json, application/*+json, */*], accept-charset=[Big5, Big5-HKSCS, CESU-8, EUC-JP, EUC-KR, GB18030, GB2312, GBK, IBM-Thai, IBM00858, IBM01140, IBM01141, IBM01142, IBM01143, IBM01144, IBM01145, IBM01146, IBM01147, IBM01148, IBM01149, IBM037, IBM1026, IBM1047, IBM273, IBM277, IBM278, IBM280, IBM284, IBM285, IBM290, IBM297, IBM420, IBM424, IBM437, IBM500, IBM775, IBM850, IBM852, IBM855, IBM857, IBM860, IBM861, IBM862, IBM863, IBM864, IBM865, IBM866, IBM868, IBM869, IBM870, IBM871, IBM918, ISO-2022-CN, ISO-2022-JP, ISO-2022-JP-2, ISO-2022-KR, ISO-8859-1, ISO-8859-13, ISO-8859-15, ISO-8859-16, ISO-8859-2, ISO-8859-3, ISO-8859-4, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, JIS_X0201, JIS_X0212-1990, KOI8-R, KOI8-U, Shift_JIS, TIS-620, US-ASCII, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE, UTF-8, windows-1250, windows-1251, windows-1252, windows-1253, windows-1254, windows-1255, windows-1256, windows-1257, windows-1258, windows-31j, x-Big5-HKSCS-2001, x-Big5-Solaris, x-euc-jp-linux, x-EUC-TW, x-eucJP-Open, x-IBM1006, x-IBM1025, x-IBM1046, x-IBM1097, x-IBM1098, x-IBM1112, x-IBM1122, x-IBM1123, x-IBM1124, x-IBM1166, x-IBM1364, x-IBM1381, x-IBM1383, x-IBM300, x-IBM33722, x-IBM737, x-IBM833, x-IBM834, x-IBM856, x-IBM874, x-IBM875, x-IBM921, x-IBM922, x-IBM930, x-IBM933, x-IBM935, x-IBM937, x-IBM939, x-IBM942, x-IBM942C, x-IBM943, x-IBM943C, x-IBM948, x-IBM949, x-IBM949C, x-IBM950, x-IBM964, x-IBM970, x-ISCII91, x-ISO-2022-CN-CNS, x-ISO-2022-CN-GB, x-iso-8859-11, x-JIS0208, x-JISAutoDetect, x-Johab, x-MacArabic, x-MacCentralEurope, x-MacCroatian, x-MacCyrillic, x-MacDingbat, x-MacGreek, x-MacHebrew, x-MacIceland, x-MacRoman, x-MacRomania, x-MacSymbol, x-MacThai, x-MacTurkish, x-MacUkraine, x-MS932_0213, x-MS950-HKSCS, x-MS950-HKSCS-XP, x-mswin-936, x-PCK, x-SJIS_0213, x-UTF-16LE-BOM, X-UTF-32BE-BOM, X-UTF-32LE-BOM, x-windows-50220, x-windows-50221, x-windows-874, x-windows-949, x-windows-950, x-windows-iso2022jp], replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, host=localhost:8080, http_requestUrl=http://localhost:8080/spring_integration_post, connection=keep-alive, id=a4a4838b-849f-35dd-b451-a38b5a5edb13, contentType=text/plain;charset=UTF-8, user-agent=Java/11.0.3, timestamp=1567156750227}]
ACK:GenericMessage [payload=B, headers={amqp_publishConfirm=true, id=1136d70b-8dc5-2397-f936-0246c0ad8073, timestamp=1567156750231}]
ACK:GenericMessage [payload=C, headers={amqp_publishConfirm=true, id=1b74d944-ec4c-75c2-83f5-3b36a8e89a39, timestamp=1567156750232}]
这意味着我的 2 个处理程序根本没有被调用。为什么?
更新
我也能够应用与发布订阅相关的建议:
@Bean
public IntegrationFlow integrationFlow() {
return IntegrationFlows.from(Http.inboundGateway("/spring_integration_post")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(String.class))
// .enrich(enricherSpec -> {
// enricherSpec.header("correlationId", 1); //ackCorrelationId
// })
.split(s -> s.applySequence(false).get().getT2().setDelimiters(","))
.log()
//.barrier(1000L)
.log()
.publishSubscribeChannel(publishSubscribeSpec -> {
publishSubscribeSpec.applySequence(true);
publishSubscribeSpec.subscribe(f -> Amqp.outboundAdapter(amqpTemplate())
.exchangeName("barrierExchange")
.routingKey("barrierKey")
.confirmAckChannel(confirmAckChannel())
.confirmCorrelationFunction(Message::getPayload));
publishSubscribeSpec.subscribe(flow -> {
flow.handle((p, h) -> "from server: " + p);
});
}
) .get();
错误消失但客户只得到最后分割的部分:
1,2,3,4,5
2019-08-30 12:48:24.570 INFO 19476 --- [ask-scheduler-7] o.s.integration.handler.LoggingHandler : GenericMessage [payload=from server: 5, headers={connection=keep-alive, id=984d89ef-aa99-7f37-8bfa-99612c05831a, Content-Length=14, contentType=text/plain;charset=UTF-8, http_statusCode=200 OK, Date=1567158504000, timestamp=1567158504569}]
We got response: GenericMessage [payload=from server: 5, headers={connection=keep-alive, id=984d89ef-aa99-7f37-8bfa-99612c05831a, Content-Length=14, contentType=text/plain;charset=UTF-8, http_statusCode=200 OK, Date=1567158504000, timestamp=1567158504569}]
据我了解,在这种情况下,2 个子流是独立的,并且出于某种原因,只有最后一部分被发送到客户端....
你的问题在这里:
.handle(Amqp.outboundAdapter(amqpTemplate())
.exchangeName("barrierExchange")
.routingKey("barrierKey")
.confirmAckChannel(confirmAckChannel())
.confirmCorrelationExpression("payload")
)
.get();
你只需停止你的流程,同时它从 Http.inboundGateway()
开始,除了一些回复发回。使用 Amqp.outboundAdapter()
可以使流程成为单向流程,因此不会将回复发送回等待的入站网关。
由于没有回复发送回客户端,它以 request timeout
错误结束,表明服务器没有及时发送回复。
这是对你的情况的回答。如何解决以及超出此问题范围的内容。我看到您尝试将 barrier
示例从 XML 复制到 Java DSL,但是您在配置中遗漏了很多部分。而且你还在几个地方打破了逻辑:
enricherSpec.header("correlationId", 1)
您对每个请求都使用相同的静态 1
。因此,当涉及到关联逻辑时,您将遇到不同请求之间的冲突问题。
.split(s -> s.applySequence(false)
如果您要实施 barrier
,当逻辑真正基于相关性时,您确实会有一个聚合器。因此,applySequence
必须是 true
才能在之后进行适当的聚合。从这里 header("correlationId"
又错了。 barrier
我们需要一个关联键(本质上是在请求和延迟回复之间)。这就是示例中的 ackCorrelationId
的原因。另一个内置 correlationId
(连同其他相关细节)由 spitter 填充并由聚合器使用。
您的请求超时,因为您删除了障碍并且没有发送响应。
@Bean
public IntegrationFlow integrationFlow(RabbitTemplate amqpTemplate) {
return IntegrationFlows.from(Http.inboundGateway("/spring_integration_post")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(String.class))
.enrich(enricherSpec -> {
enricherSpec.header("correlationId", 1); // ackCorrelationId
})
.split(s -> s.applySequence(false).get().getT2().setDelimiters(","))
.log()
// .barrier(1000L)
.log()
.publishSubscribeChannel(pubsub -> pubsub
.subscribe(f1 -> f1
.handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeName("barrierExchange")
.routingKey("barrierKey")
.confirmAckChannel(confirmAckChannel())
.confirmCorrelationExpression("payload")))
.subscribe(f2 -> f2.handle((p, h) -> "Processed OK")))
.get();
}
另外,当你实现障碍时,你的关联表达式必须是#this
;否则你将失去屏障相关性 header.
我有以下服务器配置:
@Configuration
@EnableIntegration
public class Config {
@Bean
public IntegrationFlow integrationFlow() {
return IntegrationFlows.from(Http.inboundGateway("/spring_integration_post")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(String.class))
.enrich(enricherSpec -> {
enricherSpec.header("correlationId", 1); //ackCorrelationId
})
.split(s -> s.applySequence(false).get().getT2().setDelimiters(","))
.log()
//.barrier(1000L)
.log()
.handle(Amqp.outboundAdapter(amqpTemplate())
.exchangeName("barrierExchange")
.routingKey("barrierKey")
.confirmAckChannel(confirmAckChannel())
.confirmCorrelationExpression("payload")
)
.get();
}
@Bean
public CachingConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost("localhost");
cachingConnectionFactory.setUsername("guest");
cachingConnectionFactory.setPassword("guest");
cachingConnectionFactory.setPublisherConfirms(true);
cachingConnectionFactory.setPublisherReturns(true);
return cachingConnectionFactory;
}
@Bean
public AmqpTemplate amqpTemplate() {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory());
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}
@Bean
public DirectChannel confirmAckChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow ackChannelListener() {
return IntegrationFlows.from(confirmAckChannel())
.handle(m -> {
System.out.println("ACK:" + m);
})
.get();
}
}
和以下客户端配置:
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class Config {
@Bean
public IntegrationFlow integrationFlow() {
return IntegrationFlows.from(consoleSource(), consoleConsumer())
.handle(httpOutboundGateway())
.log()
.channel("httpRequestChannel")
.handle(s -> {
System.out.println("We got response: " + s);
})
.get();
}
private HttpMessageHandlerSpec httpOutboundGateway() {
return Http.outboundGateway("http://localhost:8080/spring_integration_post") //http://localhost:8080/my_post
.httpMethod(HttpMethod.POST)
.expectedResponseType(String.class);
}
private Consumer<SourcePollingChannelAdapterSpec> consoleConsumer() {
return c -> c.poller(Pollers.fixedRate(1000)
.maxMessagesPerPoll(1));
}
public MessageSource<String> consoleSource() {
return CharacterStreamReadingMessageSource.stdin();
}
}
我从客户端发送a,v,b
我看到 服务器 接受该消息,将 3 条消息发送到 rabbitMq(通过 rabbit admin 我看到消息确实被 rabbit 接受)并得到 3 次确认:
ACK:GenericMessage [payload=a, headers={amqp_publishConfirm=true, id=eb8fd94b-5721-8b3b-5219-b7a4e0d950a8, timestamp=1567062603387}]
ACK:GenericMessage [payload=v, headers={amqp_publishConfirm=true, id=724b7def-1ab2-79b9-c788-27f6cbe24a33, timestamp=1567062603388}]
ACK:GenericMessage [payload=b, headers={amqp_publishConfirm=true, id=12a10799-d664-64dc-dd9e-1601fda61977, timestamp=1567062603389}]
但是 client 打印以下跟踪:
2019-08-29 10:10:04.392 ERROR 18404 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: HTTP request execution failed for URI [http://localhost:8080/spring_integration_post]; nested exception is org.springframework.web.client.HttpServerErrorException$InternalServerError: 500 null, failedMessage=GenericMessage [payload=a,v,b, headers={id=0b902e6f-d4de-329f-9916-19f94cdedc4e, timestamp=1567062603152}]
at org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler.exchange(HttpRequestExecutingMessageHandler.java:171)
at org.springframework.integration.http.outbound.AbstractHttpRequestExecutingMessageHandler.handleRequestMessage(AbstractHttpRequestExecutingMessageHandler.java:289)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:234)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:390)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null(AbstractPollingEndpoint.java:277)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=14=](ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller(AbstractPollingEndpoint.java:274)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.web.client.HttpServerErrorException$InternalServerError: 500 null
at org.springframework.web.client.HttpServerErrorException.create(HttpServerErrorException.java:79)
at org.springframework.web.client.DefaultResponseErrorHandler.handleError(DefaultResponseErrorHandler.java:124)
at org.springframework.web.client.DefaultResponseErrorHandler.handleError(DefaultResponseErrorHandler.java:102)
at org.springframework.web.client.ResponseErrorHandler.handleError(ResponseErrorHandler.java:63)
at org.springframework.web.client.RestTemplate.handleResponse(RestTemplate.java:778)
at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:736)
at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:710)
at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:598)
at org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler.exchange(HttpRequestExecutingMessageHandler.java:165)
... 30 more
为什么会这样?
我该如何解决?
P.S.
正如@Artem Bilan 所说,我的问题是我没有对客户端做出任何响应,因此客户端遇到了超时错误。我想说错误真的很混乱。我希望 504 错误 fir 超时。
在所有消息都发送到 rabbit 后,我试图 return 给客户端一些东西,所以我写了以下配置:
@Bean
public IntegrationFlow integrationFlow() {
return IntegrationFlows.from(Http.inboundGateway("/spring_integration_post")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(String.class))
.split(s -> s.applySequence(false).get().getT2().setDelimiters(","))
.log()
.handle(Amqp.outboundAdapter(amqpTemplate())
.exchangeName("barrierExchange")
.routingKey("barrierKey")
.confirmAckChannel(confirmAckChannel())
.confirmCorrelationFunction(Message::getPayload)
).handle((payload, headers) -> {
System.out.println("Before aggregation");
return true;
})
.aggregate()
.handle((payload, headers) -> {
System.out.println("After aggregation");
return true;
}).get();
但是从客户端我看到了相同的堆栈跟踪。同样从服务器端我看到了那个痕迹:
2019-08-30 12:19:10.166 INFO 12224 --- [nio-8080-exec-3] o.s.integration.handler.LoggingHandler : GenericMessage [payload=A, headers={content-length=5, http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, accept=[text/plain, application/json, application/*+json, */*], accept-charset=[Big5, Big5-HKSCS, CESU-8, EUC-JP, EUC-KR, GB18030, GB2312, GBK, IBM-Thai, IBM00858, IBM01140, IBM01141, IBM01142, IBM01143, IBM01144, IBM01145, IBM01146, IBM01147, IBM01148, IBM01149, IBM037, IBM1026, IBM1047, IBM273, IBM277, IBM278, IBM280, IBM284, IBM285, IBM290, IBM297, IBM420, IBM424, IBM437, IBM500, IBM775, IBM850, IBM852, IBM855, IBM857, IBM860, IBM861, IBM862, IBM863, IBM864, IBM865, IBM866, IBM868, IBM869, IBM870, IBM871, IBM918, ISO-2022-CN, ISO-2022-JP, ISO-2022-JP-2, ISO-2022-KR, ISO-8859-1, ISO-8859-13, ISO-8859-15, ISO-8859-16, ISO-8859-2, ISO-8859-3, ISO-8859-4, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, JIS_X0201, JIS_X0212-1990, KOI8-R, KOI8-U, Shift_JIS, TIS-620, US-ASCII, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE, UTF-8, windows-1250, windows-1251, windows-1252, windows-1253, windows-1254, windows-1255, windows-1256, windows-1257, windows-1258, windows-31j, x-Big5-HKSCS-2001, x-Big5-Solaris, x-euc-jp-linux, x-EUC-TW, x-eucJP-Open, x-IBM1006, x-IBM1025, x-IBM1046, x-IBM1097, x-IBM1098, x-IBM1112, x-IBM1122, x-IBM1123, x-IBM1124, x-IBM1166, x-IBM1364, x-IBM1381, x-IBM1383, x-IBM300, x-IBM33722, x-IBM737, x-IBM833, x-IBM834, x-IBM856, x-IBM874, x-IBM875, x-IBM921, x-IBM922, x-IBM930, x-IBM933, x-IBM935, x-IBM937, x-IBM939, x-IBM942, x-IBM942C, x-IBM943, x-IBM943C, x-IBM948, x-IBM949, x-IBM949C, x-IBM950, x-IBM964, x-IBM970, x-ISCII91, x-ISO-2022-CN-CNS, x-ISO-2022-CN-GB, x-iso-8859-11, x-JIS0208, x-JISAutoDetect, x-Johab, x-MacArabic, x-MacCentralEurope, x-MacCroatian, x-MacCyrillic, x-MacDingbat, x-MacGreek, x-MacHebrew, x-MacIceland, x-MacRoman, x-MacRomania, x-MacSymbol, x-MacThai, x-MacTurkish, x-MacUkraine, x-MS932_0213, x-MS950-HKSCS, x-MS950-HKSCS-XP, x-mswin-936, x-PCK, x-SJIS_0213, x-UTF-16LE-BOM, X-UTF-32BE-BOM, X-UTF-32LE-BOM, x-windows-50220, x-windows-50221, x-windows-874, x-windows-949, x-windows-950, x-windows-iso2022jp], replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, host=localhost:8080, http_requestUrl=http://localhost:8080/spring_integration_post, connection=keep-alive, id=bb52003e-0731-d64c-1285-e28adb93c87a, contentType=text/plain;charset=UTF-8, user-agent=Java/11.0.3, timestamp=1567156750166}]
2019-08-30 12:19:10.167 INFO 12224 --- [nio-8080-exec-3] o.s.integration.handler.LoggingHandler : GenericMessage [payload=A, headers={content-length=5, http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, accept=[text/plain, application/json, application/*+json, */*], accept-charset=[Big5, Big5-HKSCS, CESU-8, EUC-JP, EUC-KR, GB18030, GB2312, GBK, IBM-Thai, IBM00858, IBM01140, IBM01141, IBM01142, IBM01143, IBM01144, IBM01145, IBM01146, IBM01147, IBM01148, IBM01149, IBM037, IBM1026, IBM1047, IBM273, IBM277, IBM278, IBM280, IBM284, IBM285, IBM290, IBM297, IBM420, IBM424, IBM437, IBM500, IBM775, IBM850, IBM852, IBM855, IBM857, IBM860, IBM861, IBM862, IBM863, IBM864, IBM865, IBM866, IBM868, IBM869, IBM870, IBM871, IBM918, ISO-2022-CN, ISO-2022-JP, ISO-2022-JP-2, ISO-2022-KR, ISO-8859-1, ISO-8859-13, ISO-8859-15, ISO-8859-16, ISO-8859-2, ISO-8859-3, ISO-8859-4, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, JIS_X0201, JIS_X0212-1990, KOI8-R, KOI8-U, Shift_JIS, TIS-620, US-ASCII, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE, UTF-8, windows-1250, windows-1251, windows-1252, windows-1253, windows-1254, windows-1255, windows-1256, windows-1257, windows-1258, windows-31j, x-Big5-HKSCS-2001, x-Big5-Solaris, x-euc-jp-linux, x-EUC-TW, x-eucJP-Open, x-IBM1006, x-IBM1025, x-IBM1046, x-IBM1097, x-IBM1098, x-IBM1112, x-IBM1122, x-IBM1123, x-IBM1124, x-IBM1166, x-IBM1364, x-IBM1381, x-IBM1383, x-IBM300, x-IBM33722, x-IBM737, x-IBM833, x-IBM834, x-IBM856, x-IBM874, x-IBM875, x-IBM921, x-IBM922, x-IBM930, x-IBM933, x-IBM935, x-IBM937, x-IBM939, x-IBM942, x-IBM942C, x-IBM943, x-IBM943C, x-IBM948, x-IBM949, x-IBM949C, x-IBM950, x-IBM964, x-IBM970, x-ISCII91, x-ISO-2022-CN-CNS, x-ISO-2022-CN-GB, x-iso-8859-11, x-JIS0208, x-JISAutoDetect, x-Johab, x-MacArabic, x-MacCentralEurope, x-MacCroatian, x-MacCyrillic, x-MacDingbat, x-MacGreek, x-MacHebrew, x-MacIceland, x-MacRoman, x-MacRomania, x-MacSymbol, x-MacThai, x-MacTurkish, x-MacUkraine, x-MS932_0213, x-MS950-HKSCS, x-MS950-HKSCS-XP, x-mswin-936, x-PCK, x-SJIS_0213, x-UTF-16LE-BOM, X-UTF-32BE-BOM, X-UTF-32LE-BOM, x-windows-50220, x-windows-50221, x-windows-874, x-windows-949, x-windows-950, x-windows-iso2022jp], replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, host=localhost:8080, http_requestUrl=http://localhost:8080/spring_integration_post, connection=keep-alive, id=bb52003e-0731-d64c-1285-e28adb93c87a, contentType=text/plain;charset=UTF-8, user-agent=Java/11.0.3, timestamp=1567156750166}]
2019-08-30 12:19:10.172 INFO 12224 --- [nio-8080-exec-3] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: localhost:5672
2019-08-30 12:19:10.198 INFO 12224 --- [nio-8080-exec-3] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#123d7057:0/SimpleConnection@2ae574e5 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 63045]
2019-08-30 12:19:10.224 INFO 12224 --- [nio-8080-exec-3] o.s.integration.handler.LoggingHandler : GenericMessage [payload=B, headers={content-length=5, http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, accept=[text/plain, application/json, application/*+json, */*], accept-charset=[Big5, Big5-HKSCS, CESU-8, EUC-JP, EUC-KR, GB18030, GB2312, GBK, IBM-Thai, IBM00858, IBM01140, IBM01141, IBM01142, IBM01143, IBM01144, IBM01145, IBM01146, IBM01147, IBM01148, IBM01149, IBM037, IBM1026, IBM1047, IBM273, IBM277, IBM278, IBM280, IBM284, IBM285, IBM290, IBM297, IBM420, IBM424, IBM437, IBM500, IBM775, IBM850, IBM852, IBM855, IBM857, IBM860, IBM861, IBM862, IBM863, IBM864, IBM865, IBM866, IBM868, IBM869, IBM870, IBM871, IBM918, ISO-2022-CN, ISO-2022-JP, ISO-2022-JP-2, ISO-2022-KR, ISO-8859-1, ISO-8859-13, ISO-8859-15, ISO-8859-16, ISO-8859-2, ISO-8859-3, ISO-8859-4, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, JIS_X0201, JIS_X0212-1990, KOI8-R, KOI8-U, Shift_JIS, TIS-620, US-ASCII, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE, UTF-8, windows-1250, windows-1251, windows-1252, windows-1253, windows-1254, windows-1255, windows-1256, windows-1257, windows-1258, windows-31j, x-Big5-HKSCS-2001, x-Big5-Solaris, x-euc-jp-linux, x-EUC-TW, x-eucJP-Open, x-IBM1006, x-IBM1025, x-IBM1046, x-IBM1097, x-IBM1098, x-IBM1112, x-IBM1122, x-IBM1123, x-IBM1124, x-IBM1166, x-IBM1364, x-IBM1381, x-IBM1383, x-IBM300, x-IBM33722, x-IBM737, x-IBM833, x-IBM834, x-IBM856, x-IBM874, x-IBM875, x-IBM921, x-IBM922, x-IBM930, x-IBM933, x-IBM935, x-IBM937, x-IBM939, x-IBM942, x-IBM942C, x-IBM943, x-IBM943C, x-IBM948, x-IBM949, x-IBM949C, x-IBM950, x-IBM964, x-IBM970, x-ISCII91, x-ISO-2022-CN-CNS, x-ISO-2022-CN-GB, x-iso-8859-11, x-JIS0208, x-JISAutoDetect, x-Johab, x-MacArabic, x-MacCentralEurope, x-MacCroatian, x-MacCyrillic, x-MacDingbat, x-MacGreek, x-MacHebrew, x-MacIceland, x-MacRoman, x-MacRomania, x-MacSymbol, x-MacThai, x-MacTurkish, x-MacUkraine, x-MS932_0213, x-MS950-HKSCS, x-MS950-HKSCS-XP, x-mswin-936, x-PCK, x-SJIS_0213, x-UTF-16LE-BOM, X-UTF-32BE-BOM, X-UTF-32LE-BOM, x-windows-50220, x-windows-50221, x-windows-874, x-windows-949, x-windows-950, x-windows-iso2022jp], replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, host=localhost:8080, http_requestUrl=http://localhost:8080/spring_integration_post, connection=keep-alive, id=b4b5415d-4ad9-167b-2bea-905ec1cc66cf, contentType=text/plain;charset=UTF-8, user-agent=Java/11.0.3, timestamp=1567156750224}]
2019-08-30 12:19:10.225 INFO 12224 --- [nio-8080-exec-3] o.s.integration.handler.LoggingHandler : GenericMessage [payload=B, headers={content-length=5, http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, accept=[text/plain, application/json, application/*+json, */*], accept-charset=[Big5, Big5-HKSCS, CESU-8, EUC-JP, EUC-KR, GB18030, GB2312, GBK, IBM-Thai, IBM00858, IBM01140, IBM01141, IBM01142, IBM01143, IBM01144, IBM01145, IBM01146, IBM01147, IBM01148, IBM01149, IBM037, IBM1026, IBM1047, IBM273, IBM277, IBM278, IBM280, IBM284, IBM285, IBM290, IBM297, IBM420, IBM424, IBM437, IBM500, IBM775, IBM850, IBM852, IBM855, IBM857, IBM860, IBM861, IBM862, IBM863, IBM864, IBM865, IBM866, IBM868, IBM869, IBM870, IBM871, IBM918, ISO-2022-CN, ISO-2022-JP, ISO-2022-JP-2, ISO-2022-KR, ISO-8859-1, ISO-8859-13, ISO-8859-15, ISO-8859-16, ISO-8859-2, ISO-8859-3, ISO-8859-4, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, JIS_X0201, JIS_X0212-1990, KOI8-R, KOI8-U, Shift_JIS, TIS-620, US-ASCII, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE, UTF-8, windows-1250, windows-1251, windows-1252, windows-1253, windows-1254, windows-1255, windows-1256, windows-1257, windows-1258, windows-31j, x-Big5-HKSCS-2001, x-Big5-Solaris, x-euc-jp-linux, x-EUC-TW, x-eucJP-Open, x-IBM1006, x-IBM1025, x-IBM1046, x-IBM1097, x-IBM1098, x-IBM1112, x-IBM1122, x-IBM1123, x-IBM1124, x-IBM1166, x-IBM1364, x-IBM1381, x-IBM1383, x-IBM300, x-IBM33722, x-IBM737, x-IBM833, x-IBM834, x-IBM856, x-IBM874, x-IBM875, x-IBM921, x-IBM922, x-IBM930, x-IBM933, x-IBM935, x-IBM937, x-IBM939, x-IBM942, x-IBM942C, x-IBM943, x-IBM943C, x-IBM948, x-IBM949, x-IBM949C, x-IBM950, x-IBM964, x-IBM970, x-ISCII91, x-ISO-2022-CN-CNS, x-ISO-2022-CN-GB, x-iso-8859-11, x-JIS0208, x-JISAutoDetect, x-Johab, x-MacArabic, x-MacCentralEurope, x-MacCroatian, x-MacCyrillic, x-MacDingbat, x-MacGreek, x-MacHebrew, x-MacIceland, x-MacRoman, x-MacRomania, x-MacSymbol, x-MacThai, x-MacTurkish, x-MacUkraine, x-MS932_0213, x-MS950-HKSCS, x-MS950-HKSCS-XP, x-mswin-936, x-PCK, x-SJIS_0213, x-UTF-16LE-BOM, X-UTF-32BE-BOM, X-UTF-32LE-BOM, x-windows-50220, x-windows-50221, x-windows-874, x-windows-949, x-windows-950, x-windows-iso2022jp], replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, host=localhost:8080, http_requestUrl=http://localhost:8080/spring_integration_post, connection=keep-alive, id=b4b5415d-4ad9-167b-2bea-905ec1cc66cf, contentType=text/plain;charset=UTF-8, user-agent=Java/11.0.3, timestamp=1567156750224}]
ACK:GenericMessage [payload=A, headers={amqp_publishConfirm=true, id=5d85f459-661d-69c4-4d36-b5843289b41b, timestamp=1567156750227}]
2019-08-30 12:19:10.227 INFO 12224 --- [nio-8080-exec-3] o.s.integration.handler.LoggingHandler : GenericMessage [payload=C, headers={content-length=5, http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, accept=[text/plain, application/json, application/*+json, */*], accept-charset=[Big5, Big5-HKSCS, CESU-8, EUC-JP, EUC-KR, GB18030, GB2312, GBK, IBM-Thai, IBM00858, IBM01140, IBM01141, IBM01142, IBM01143, IBM01144, IBM01145, IBM01146, IBM01147, IBM01148, IBM01149, IBM037, IBM1026, IBM1047, IBM273, IBM277, IBM278, IBM280, IBM284, IBM285, IBM290, IBM297, IBM420, IBM424, IBM437, IBM500, IBM775, IBM850, IBM852, IBM855, IBM857, IBM860, IBM861, IBM862, IBM863, IBM864, IBM865, IBM866, IBM868, IBM869, IBM870, IBM871, IBM918, ISO-2022-CN, ISO-2022-JP, ISO-2022-JP-2, ISO-2022-KR, ISO-8859-1, ISO-8859-13, ISO-8859-15, ISO-8859-16, ISO-8859-2, ISO-8859-3, ISO-8859-4, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, JIS_X0201, JIS_X0212-1990, KOI8-R, KOI8-U, Shift_JIS, TIS-620, US-ASCII, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE, UTF-8, windows-1250, windows-1251, windows-1252, windows-1253, windows-1254, windows-1255, windows-1256, windows-1257, windows-1258, windows-31j, x-Big5-HKSCS-2001, x-Big5-Solaris, x-euc-jp-linux, x-EUC-TW, x-eucJP-Open, x-IBM1006, x-IBM1025, x-IBM1046, x-IBM1097, x-IBM1098, x-IBM1112, x-IBM1122, x-IBM1123, x-IBM1124, x-IBM1166, x-IBM1364, x-IBM1381, x-IBM1383, x-IBM300, x-IBM33722, x-IBM737, x-IBM833, x-IBM834, x-IBM856, x-IBM874, x-IBM875, x-IBM921, x-IBM922, x-IBM930, x-IBM933, x-IBM935, x-IBM937, x-IBM939, x-IBM942, x-IBM942C, x-IBM943, x-IBM943C, x-IBM948, x-IBM949, x-IBM949C, x-IBM950, x-IBM964, x-IBM970, x-ISCII91, x-ISO-2022-CN-CNS, x-ISO-2022-CN-GB, x-iso-8859-11, x-JIS0208, x-JISAutoDetect, x-Johab, x-MacArabic, x-MacCentralEurope, x-MacCroatian, x-MacCyrillic, x-MacDingbat, x-MacGreek, x-MacHebrew, x-MacIceland, x-MacRoman, x-MacRomania, x-MacSymbol, x-MacThai, x-MacTurkish, x-MacUkraine, x-MS932_0213, x-MS950-HKSCS, x-MS950-HKSCS-XP, x-mswin-936, x-PCK, x-SJIS_0213, x-UTF-16LE-BOM, X-UTF-32BE-BOM, X-UTF-32LE-BOM, x-windows-50220, x-windows-50221, x-windows-874, x-windows-949, x-windows-950, x-windows-iso2022jp], replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, host=localhost:8080, http_requestUrl=http://localhost:8080/spring_integration_post, connection=keep-alive, id=a4a4838b-849f-35dd-b451-a38b5a5edb13, contentType=text/plain;charset=UTF-8, user-agent=Java/11.0.3, timestamp=1567156750227}]
2019-08-30 12:19:10.228 INFO 12224 --- [nio-8080-exec-3] o.s.integration.handler.LoggingHandler : GenericMessage [payload=C, headers={content-length=5, http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, accept=[text/plain, application/json, application/*+json, */*], accept-charset=[Big5, Big5-HKSCS, CESU-8, EUC-JP, EUC-KR, GB18030, GB2312, GBK, IBM-Thai, IBM00858, IBM01140, IBM01141, IBM01142, IBM01143, IBM01144, IBM01145, IBM01146, IBM01147, IBM01148, IBM01149, IBM037, IBM1026, IBM1047, IBM273, IBM277, IBM278, IBM280, IBM284, IBM285, IBM290, IBM297, IBM420, IBM424, IBM437, IBM500, IBM775, IBM850, IBM852, IBM855, IBM857, IBM860, IBM861, IBM862, IBM863, IBM864, IBM865, IBM866, IBM868, IBM869, IBM870, IBM871, IBM918, ISO-2022-CN, ISO-2022-JP, ISO-2022-JP-2, ISO-2022-KR, ISO-8859-1, ISO-8859-13, ISO-8859-15, ISO-8859-16, ISO-8859-2, ISO-8859-3, ISO-8859-4, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, JIS_X0201, JIS_X0212-1990, KOI8-R, KOI8-U, Shift_JIS, TIS-620, US-ASCII, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE, UTF-8, windows-1250, windows-1251, windows-1252, windows-1253, windows-1254, windows-1255, windows-1256, windows-1257, windows-1258, windows-31j, x-Big5-HKSCS-2001, x-Big5-Solaris, x-euc-jp-linux, x-EUC-TW, x-eucJP-Open, x-IBM1006, x-IBM1025, x-IBM1046, x-IBM1097, x-IBM1098, x-IBM1112, x-IBM1122, x-IBM1123, x-IBM1124, x-IBM1166, x-IBM1364, x-IBM1381, x-IBM1383, x-IBM300, x-IBM33722, x-IBM737, x-IBM833, x-IBM834, x-IBM856, x-IBM874, x-IBM875, x-IBM921, x-IBM922, x-IBM930, x-IBM933, x-IBM935, x-IBM937, x-IBM939, x-IBM942, x-IBM942C, x-IBM943, x-IBM943C, x-IBM948, x-IBM949, x-IBM949C, x-IBM950, x-IBM964, x-IBM970, x-ISCII91, x-ISO-2022-CN-CNS, x-ISO-2022-CN-GB, x-iso-8859-11, x-JIS0208, x-JISAutoDetect, x-Johab, x-MacArabic, x-MacCentralEurope, x-MacCroatian, x-MacCyrillic, x-MacDingbat, x-MacGreek, x-MacHebrew, x-MacIceland, x-MacRoman, x-MacRomania, x-MacSymbol, x-MacThai, x-MacTurkish, x-MacUkraine, x-MS932_0213, x-MS950-HKSCS, x-MS950-HKSCS-XP, x-mswin-936, x-PCK, x-SJIS_0213, x-UTF-16LE-BOM, X-UTF-32BE-BOM, X-UTF-32LE-BOM, x-windows-50220, x-windows-50221, x-windows-874, x-windows-949, x-windows-950, x-windows-iso2022jp], replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, host=localhost:8080, http_requestUrl=http://localhost:8080/spring_integration_post, connection=keep-alive, id=a4a4838b-849f-35dd-b451-a38b5a5edb13, contentType=text/plain;charset=UTF-8, user-agent=Java/11.0.3, timestamp=1567156750227}]
ACK:GenericMessage [payload=B, headers={amqp_publishConfirm=true, id=1136d70b-8dc5-2397-f936-0246c0ad8073, timestamp=1567156750231}]
ACK:GenericMessage [payload=C, headers={amqp_publishConfirm=true, id=1b74d944-ec4c-75c2-83f5-3b36a8e89a39, timestamp=1567156750232}]
这意味着我的 2 个处理程序根本没有被调用。为什么?
更新
我也能够应用与发布订阅相关的建议:
@Bean
public IntegrationFlow integrationFlow() {
return IntegrationFlows.from(Http.inboundGateway("/spring_integration_post")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(String.class))
// .enrich(enricherSpec -> {
// enricherSpec.header("correlationId", 1); //ackCorrelationId
// })
.split(s -> s.applySequence(false).get().getT2().setDelimiters(","))
.log()
//.barrier(1000L)
.log()
.publishSubscribeChannel(publishSubscribeSpec -> {
publishSubscribeSpec.applySequence(true);
publishSubscribeSpec.subscribe(f -> Amqp.outboundAdapter(amqpTemplate())
.exchangeName("barrierExchange")
.routingKey("barrierKey")
.confirmAckChannel(confirmAckChannel())
.confirmCorrelationFunction(Message::getPayload));
publishSubscribeSpec.subscribe(flow -> {
flow.handle((p, h) -> "from server: " + p);
});
}
) .get();
错误消失但客户只得到最后分割的部分:
1,2,3,4,5
2019-08-30 12:48:24.570 INFO 19476 --- [ask-scheduler-7] o.s.integration.handler.LoggingHandler : GenericMessage [payload=from server: 5, headers={connection=keep-alive, id=984d89ef-aa99-7f37-8bfa-99612c05831a, Content-Length=14, contentType=text/plain;charset=UTF-8, http_statusCode=200 OK, Date=1567158504000, timestamp=1567158504569}]
We got response: GenericMessage [payload=from server: 5, headers={connection=keep-alive, id=984d89ef-aa99-7f37-8bfa-99612c05831a, Content-Length=14, contentType=text/plain;charset=UTF-8, http_statusCode=200 OK, Date=1567158504000, timestamp=1567158504569}]
据我了解,在这种情况下,2 个子流是独立的,并且出于某种原因,只有最后一部分被发送到客户端....
你的问题在这里:
.handle(Amqp.outboundAdapter(amqpTemplate())
.exchangeName("barrierExchange")
.routingKey("barrierKey")
.confirmAckChannel(confirmAckChannel())
.confirmCorrelationExpression("payload")
)
.get();
你只需停止你的流程,同时它从 Http.inboundGateway()
开始,除了一些回复发回。使用 Amqp.outboundAdapter()
可以使流程成为单向流程,因此不会将回复发送回等待的入站网关。
由于没有回复发送回客户端,它以 request timeout
错误结束,表明服务器没有及时发送回复。
这是对你的情况的回答。如何解决以及超出此问题范围的内容。我看到您尝试将 barrier
示例从 XML 复制到 Java DSL,但是您在配置中遗漏了很多部分。而且你还在几个地方打破了逻辑:
enricherSpec.header("correlationId", 1)
您对每个请求都使用相同的静态 1
。因此,当涉及到关联逻辑时,您将遇到不同请求之间的冲突问题。
.split(s -> s.applySequence(false)
如果您要实施 barrier
,当逻辑真正基于相关性时,您确实会有一个聚合器。因此,applySequence
必须是 true
才能在之后进行适当的聚合。从这里 header("correlationId"
又错了。 barrier
我们需要一个关联键(本质上是在请求和延迟回复之间)。这就是示例中的 ackCorrelationId
的原因。另一个内置 correlationId
(连同其他相关细节)由 spitter 填充并由聚合器使用。
您的请求超时,因为您删除了障碍并且没有发送响应。
@Bean
public IntegrationFlow integrationFlow(RabbitTemplate amqpTemplate) {
return IntegrationFlows.from(Http.inboundGateway("/spring_integration_post")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(String.class))
.enrich(enricherSpec -> {
enricherSpec.header("correlationId", 1); // ackCorrelationId
})
.split(s -> s.applySequence(false).get().getT2().setDelimiters(","))
.log()
// .barrier(1000L)
.log()
.publishSubscribeChannel(pubsub -> pubsub
.subscribe(f1 -> f1
.handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeName("barrierExchange")
.routingKey("barrierKey")
.confirmAckChannel(confirmAckChannel())
.confirmCorrelationExpression("payload")))
.subscribe(f2 -> f2.handle((p, h) -> "Processed OK")))
.get();
}
另外,当你实现障碍时,你的关联表达式必须是#this
;否则你将失去屏障相关性 header.