使用 wireTap 时 replyChannel 超时

Timeout on replyChannel when wireTap is used

我们正在使用 wireTap 在流的不同部分获取时间戳。当引入最新流时,它开始导致 replyChannel 超时。根据我从文档中了解到的情况,wireTap 确实会拦截消息并将其发送到辅助通道,同时不会影响主流 - 因此它看起来是用来拍摄所述时间戳快照的完美工具。我们是否在工作中使用了错误的组件,或者配置有问题?如果是这样,您建议如何注册此类信息?

异常:

o.s.integration.core.MessagingTemplate   : Failed to receive message from channel 'org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@21845b0d' within timeout: 1000

代码:

@Bean
public MarshallingWebServiceInboundGateway inboundGateway(Jaxb2Marshaller jaxb2Marshaller,
    DefaultSoapHeaderMapper defaultSoapHeaderMapper) {

    final MarshallingWebServiceInboundGateway inboundGateway =
        new MarshallingWebServiceInboundGateway(jaxb2Marshaller);
    inboundGateway.setRequestChannelName(INPUT_CHANNEL_NAME);
    inboundGateway.setHeaderMapper(defaultSoapHeaderMapper);
    return inboundGateway;
}

@Bean
public IntegrationFlow querySynchronous() {
    return IntegrationFlows.from(INPUT_CHANNEL_NAME)
        .enrichHeaders(...) 
        .wireTap(performanceTimestampRegistrator.registerTimestampFlow(SYNC_REQUEST_RECEIVED_TIMESTAMP_NAME))
        .handle(outboundGateway)
        .wireTap(performanceTimestampRegistrator.registerTimestampFlow(SYNC_RESPONSE_RECEIVED_TIMESTAMP_NAME))
        //.transform( m -> m) // for tests - REMOVE
        .get();
}

时间戳流:

public IntegrationFlow registerTimestampFlow(String asyncRequestReceivedTimestampName) {
    return channel -> channel.handle(
        m -> MetadataStoreConfig.registerFlowTimestamp(m, metadataStore, asyncRequestReceivedTimestampName));
}

这里值得注意的是,如果我取消对无操作变压器的注释,突然一切正常,但听起来不对,我想避免这种解决方法。

另一件事是,另一个非常相似的流程可以正常工作,没有任何解决方法。显着的区别在于它使用 kafka 适配器将消息放入 kafka,而不是使用出站网关调用某些 Web 服务。它仍然会生成要处理的响应(使用 generateResponseFlow()),因此它的行为方式应该相同。这是流程,工作正常:

@Bean
public MarshallingWebServiceInboundGateway workingInboundGateway(Jaxb2Marshaller jaxb2Marshaller,
    DefaultSoapHeaderMapper defaultSoapHeaderMapper, @Qualifier("errorChannel") MessageChannel errorChannel) {

    MarshallingWebServiceInboundGateway aeoNotificationInboundGateway =
        new MarshallingWebServiceInboundGateway(jaxb2Marshaller);
    aeoNotificationInboundGateway.setRequestChannelName(WORKING_INPUT_CHANNEL_NAME);
    aeoNotificationInboundGateway.setHeaderMapper(defaultSoapHeaderMapper);
    aeoNotificationInboundGateway.setErrorChannel(errorChannel);
    return aeoNotificationInboundGateway;
}

@Bean
public IntegrationFlow workingEnqueue() {
    return IntegrationFlows.from(WORKING_INPUT_CHANNEL_NAME)
        .enrichHeaders(...)
        .wireTap(performanceTimestampRegistrator
            .registerTimestampFlow(ASYNC_REQUEST_RECEIVED_TIMESTAMP_NAME))
        .filter(...)
        .filter(...)
        .publishSubscribeChannel(channel -> channel
            .subscribe(sendToKafkaFlow())
            .subscribe(generateResponseFlow()))
        .wireTap(performanceTimestampRegistrator
            .registerTimestampFlow(ASYNC_REQUEST_ENQUEUED_TIMESTAMP_NAME))
        .get();
}

那么,wireTap作为最后一个组件就没有问题了,replyChannel及时正确接收了响应,没有任何变通方法。

该行为是预期的。 流程末尾使用wireTap()(或log())时,默认不回复。 由于我们无法假设您尝试将什么逻辑包含到流程定义中,因此我们会尽最大努力使用默认行为 - 流程变成一种单向的、发送后不管的流程:有些人真的要求让它成为非log() ...

后可回复

要让它仍然回复来电者,您需要在流程末尾添加一个 bridge()。 在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-log

它适用于您非常复杂的场景,因为您的 publishSubscribeChannel 的订阅者之一是 generateResponseFlow() 的回复。老实说,你需要小心请求-回复行为和这样的 publishSubscribeChannel 配置。 replyChannel 只能接受一个回复,如果您希望收到多个订阅者的回复,您会惊讶于这种行为有多么奇怪。

您的配置中的 wireTap 不是订阅者,它是注入到 publishSubscribeChannel 中的拦截器。因此,您关于相似性的假设具有误导性。窃听后流程结束,但由于其中一个订阅者正在回复,您会得到预期的行为。让我们来看看 publishSubscribeChannel 作为并联电路,其中所有连接都独立于其他连接获得电力。他们执行自己的工作而不影响所有其他人。无论如何,这是不同的故事。

总结:要从 wireTap() 之后的流程中回复,您需要指定一个 bridge() 并且回复消息将从调用方正确路由到 replyChannel