使用 wireTap 时 replyChannel 超时
Timeout on replyChannel when wireTap is used
我们正在使用 wireTap 在流的不同部分获取时间戳。当引入最新流时,它开始导致 replyChannel 超时。根据我从文档中了解到的情况,wireTap 确实会拦截消息并将其发送到辅助通道,同时不会影响主流 - 因此它看起来是用来拍摄所述时间戳快照的完美工具。我们是否在工作中使用了错误的组件,或者配置有问题?如果是这样,您建议如何注册此类信息?
异常:
o.s.integration.core.MessagingTemplate : Failed to receive message from channel 'org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@21845b0d' within timeout: 1000
代码:
@Bean
public MarshallingWebServiceInboundGateway inboundGateway(Jaxb2Marshaller jaxb2Marshaller,
DefaultSoapHeaderMapper defaultSoapHeaderMapper) {
final MarshallingWebServiceInboundGateway inboundGateway =
new MarshallingWebServiceInboundGateway(jaxb2Marshaller);
inboundGateway.setRequestChannelName(INPUT_CHANNEL_NAME);
inboundGateway.setHeaderMapper(defaultSoapHeaderMapper);
return inboundGateway;
}
@Bean
public IntegrationFlow querySynchronous() {
return IntegrationFlows.from(INPUT_CHANNEL_NAME)
.enrichHeaders(...)
.wireTap(performanceTimestampRegistrator.registerTimestampFlow(SYNC_REQUEST_RECEIVED_TIMESTAMP_NAME))
.handle(outboundGateway)
.wireTap(performanceTimestampRegistrator.registerTimestampFlow(SYNC_RESPONSE_RECEIVED_TIMESTAMP_NAME))
//.transform( m -> m) // for tests - REMOVE
.get();
}
时间戳流:
public IntegrationFlow registerTimestampFlow(String asyncRequestReceivedTimestampName) {
return channel -> channel.handle(
m -> MetadataStoreConfig.registerFlowTimestamp(m, metadataStore, asyncRequestReceivedTimestampName));
}
这里值得注意的是,如果我取消对无操作变压器的注释,突然一切正常,但听起来不对,我想避免这种解决方法。
另一件事是,另一个非常相似的流程可以正常工作,没有任何解决方法。显着的区别在于它使用 kafka 适配器将消息放入 kafka,而不是使用出站网关调用某些 Web 服务。它仍然会生成要处理的响应(使用 generateResponseFlow()),因此它的行为方式应该相同。这是流程,工作正常:
@Bean
public MarshallingWebServiceInboundGateway workingInboundGateway(Jaxb2Marshaller jaxb2Marshaller,
DefaultSoapHeaderMapper defaultSoapHeaderMapper, @Qualifier("errorChannel") MessageChannel errorChannel) {
MarshallingWebServiceInboundGateway aeoNotificationInboundGateway =
new MarshallingWebServiceInboundGateway(jaxb2Marshaller);
aeoNotificationInboundGateway.setRequestChannelName(WORKING_INPUT_CHANNEL_NAME);
aeoNotificationInboundGateway.setHeaderMapper(defaultSoapHeaderMapper);
aeoNotificationInboundGateway.setErrorChannel(errorChannel);
return aeoNotificationInboundGateway;
}
@Bean
public IntegrationFlow workingEnqueue() {
return IntegrationFlows.from(WORKING_INPUT_CHANNEL_NAME)
.enrichHeaders(...)
.wireTap(performanceTimestampRegistrator
.registerTimestampFlow(ASYNC_REQUEST_RECEIVED_TIMESTAMP_NAME))
.filter(...)
.filter(...)
.publishSubscribeChannel(channel -> channel
.subscribe(sendToKafkaFlow())
.subscribe(generateResponseFlow()))
.wireTap(performanceTimestampRegistrator
.registerTimestampFlow(ASYNC_REQUEST_ENQUEUED_TIMESTAMP_NAME))
.get();
}
那么,wireTap作为最后一个组件就没有问题了,replyChannel及时正确接收了响应,没有任何变通方法。
该行为是预期的。
流程末尾使用wireTap()
(或log()
)时,默认不回复。
由于我们无法假设您尝试将什么逻辑包含到流程定义中,因此我们会尽最大努力使用默认行为 - 流程变成一种单向的、发送后不管的流程:有些人真的要求让它成为非log()
...
后可回复
要让它仍然回复来电者,您需要在流程末尾添加一个 bridge()
。
在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-log
它适用于您非常复杂的场景,因为您的 publishSubscribeChannel
的订阅者之一是 generateResponseFlow()
的回复。老实说,你需要小心请求-回复行为和这样的 publishSubscribeChannel
配置。 replyChannel
只能接受一个回复,如果您希望收到多个订阅者的回复,您会惊讶于这种行为有多么奇怪。
您的配置中的 wireTap
不是订阅者,它是注入到 publishSubscribeChannel
中的拦截器。因此,您关于相似性的假设具有误导性。窃听后流程结束,但由于其中一个订阅者正在回复,您会得到预期的行为。让我们来看看 publishSubscribeChannel
作为并联电路,其中所有连接都独立于其他连接获得电力。他们执行自己的工作而不影响所有其他人。无论如何,这是不同的故事。
总结:要从 wireTap()
之后的流程中回复,您需要指定一个 bridge()
并且回复消息将从调用方正确路由到 replyChannel
。
我们正在使用 wireTap 在流的不同部分获取时间戳。当引入最新流时,它开始导致 replyChannel 超时。根据我从文档中了解到的情况,wireTap 确实会拦截消息并将其发送到辅助通道,同时不会影响主流 - 因此它看起来是用来拍摄所述时间戳快照的完美工具。我们是否在工作中使用了错误的组件,或者配置有问题?如果是这样,您建议如何注册此类信息?
异常:
o.s.integration.core.MessagingTemplate : Failed to receive message from channel 'org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@21845b0d' within timeout: 1000
代码:
@Bean
public MarshallingWebServiceInboundGateway inboundGateway(Jaxb2Marshaller jaxb2Marshaller,
DefaultSoapHeaderMapper defaultSoapHeaderMapper) {
final MarshallingWebServiceInboundGateway inboundGateway =
new MarshallingWebServiceInboundGateway(jaxb2Marshaller);
inboundGateway.setRequestChannelName(INPUT_CHANNEL_NAME);
inboundGateway.setHeaderMapper(defaultSoapHeaderMapper);
return inboundGateway;
}
@Bean
public IntegrationFlow querySynchronous() {
return IntegrationFlows.from(INPUT_CHANNEL_NAME)
.enrichHeaders(...)
.wireTap(performanceTimestampRegistrator.registerTimestampFlow(SYNC_REQUEST_RECEIVED_TIMESTAMP_NAME))
.handle(outboundGateway)
.wireTap(performanceTimestampRegistrator.registerTimestampFlow(SYNC_RESPONSE_RECEIVED_TIMESTAMP_NAME))
//.transform( m -> m) // for tests - REMOVE
.get();
}
时间戳流:
public IntegrationFlow registerTimestampFlow(String asyncRequestReceivedTimestampName) {
return channel -> channel.handle(
m -> MetadataStoreConfig.registerFlowTimestamp(m, metadataStore, asyncRequestReceivedTimestampName));
}
这里值得注意的是,如果我取消对无操作变压器的注释,突然一切正常,但听起来不对,我想避免这种解决方法。
另一件事是,另一个非常相似的流程可以正常工作,没有任何解决方法。显着的区别在于它使用 kafka 适配器将消息放入 kafka,而不是使用出站网关调用某些 Web 服务。它仍然会生成要处理的响应(使用 generateResponseFlow()),因此它的行为方式应该相同。这是流程,工作正常:
@Bean
public MarshallingWebServiceInboundGateway workingInboundGateway(Jaxb2Marshaller jaxb2Marshaller,
DefaultSoapHeaderMapper defaultSoapHeaderMapper, @Qualifier("errorChannel") MessageChannel errorChannel) {
MarshallingWebServiceInboundGateway aeoNotificationInboundGateway =
new MarshallingWebServiceInboundGateway(jaxb2Marshaller);
aeoNotificationInboundGateway.setRequestChannelName(WORKING_INPUT_CHANNEL_NAME);
aeoNotificationInboundGateway.setHeaderMapper(defaultSoapHeaderMapper);
aeoNotificationInboundGateway.setErrorChannel(errorChannel);
return aeoNotificationInboundGateway;
}
@Bean
public IntegrationFlow workingEnqueue() {
return IntegrationFlows.from(WORKING_INPUT_CHANNEL_NAME)
.enrichHeaders(...)
.wireTap(performanceTimestampRegistrator
.registerTimestampFlow(ASYNC_REQUEST_RECEIVED_TIMESTAMP_NAME))
.filter(...)
.filter(...)
.publishSubscribeChannel(channel -> channel
.subscribe(sendToKafkaFlow())
.subscribe(generateResponseFlow()))
.wireTap(performanceTimestampRegistrator
.registerTimestampFlow(ASYNC_REQUEST_ENQUEUED_TIMESTAMP_NAME))
.get();
}
那么,wireTap作为最后一个组件就没有问题了,replyChannel及时正确接收了响应,没有任何变通方法。
该行为是预期的。
流程末尾使用wireTap()
(或log()
)时,默认不回复。
由于我们无法假设您尝试将什么逻辑包含到流程定义中,因此我们会尽最大努力使用默认行为 - 流程变成一种单向的、发送后不管的流程:有些人真的要求让它成为非log()
...
要让它仍然回复来电者,您需要在流程末尾添加一个 bridge()
。
在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-log
它适用于您非常复杂的场景,因为您的 publishSubscribeChannel
的订阅者之一是 generateResponseFlow()
的回复。老实说,你需要小心请求-回复行为和这样的 publishSubscribeChannel
配置。 replyChannel
只能接受一个回复,如果您希望收到多个订阅者的回复,您会惊讶于这种行为有多么奇怪。
您的配置中的 wireTap
不是订阅者,它是注入到 publishSubscribeChannel
中的拦截器。因此,您关于相似性的假设具有误导性。窃听后流程结束,但由于其中一个订阅者正在回复,您会得到预期的行为。让我们来看看 publishSubscribeChannel
作为并联电路,其中所有连接都独立于其他连接获得电力。他们执行自己的工作而不影响所有其他人。无论如何,这是不同的故事。
总结:要从 wireTap()
之后的流程中回复,您需要指定一个 bridge()
并且回复消息将从调用方正确路由到 replyChannel
。