Spring 集成 Java DSL HTTP 在超时错误内未收到回复

Spring Integration Java DSL HTTP no reply received within timeout error

我正在使用 Spring 集成 5.0.6。我已经查看了它的文档并创建了以下代码,该代码侦听 HTTP 端点并发布到 kafka 主题。

一切正常,我也在主题上收到消息。但是在 HTTP 客户端没有发送回复,它给出 "No reply received within timeout".

如何在以下代码中将回复发送回 http 调用方:

@Bean
public DirectChannel replyChannel() {
    return new DirectChannel();
}

@Bean(name = "restInputFlow")
public IntegrationFlow send() {
    return IntegrationFlows
            .from(Http.inboundGateway("/push").requestMapping(m -> m.methods(HttpMethod.POST))
                    .requestPayloadType(String.class).replyChannel(replyChannel()))
            .transform(new Transformer())
            .handle(kafkaMessageHandler(producerFactory(), getKafkaSourceTopic()))
            .enrichHeaders(
                    c -> c.header(org.springframework.integration.http.HttpHeaders.STATUS_CODE, HttpStatus.CREATED))
            .get();
}

private KafkaProducerMessageHandlerSpec<GenericRecord, GenericRecord, ?> kafkaMessageHandler(
            ProducerFactory<GenericRecord, GenericRecord> producerFactory, String topic) {

        return Kafka.outboundChannelAdapter(producerFactory)
                .messageKey("key").headerMapper(mapper())
                .topicExpression("headers[kafka_topic] ?: '" + topic + "'")
                .configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
    }

感谢您的帮助。

你的问题你用了one-way Kafka.outboundChannelAdapter(producerFactory)。这仅适用于 "send-and-forget".

如果你有兴趣制作一些后续流程或者只是需要回复HTTP请求,你应该考虑使用a:

/**
 * The {@link org.springframework.integration.channel.PublishSubscribeChannel} {@link #channel}
 * method specific implementation to allow the use of the 'subflow' subscriber capability.
 * @param publishSubscribeChannelConfigurer the {@link Consumer} to specify
 * {@link PublishSubscribeSpec} options including 'subflow' definition.
 * @return the current {@link IntegrationFlowDefinition}.
 */
public B publishSubscribeChannel(Consumer<PublishSubscribeSpec> publishSubscribeChannelConfigurer) {

在流程定义中,您的第一个订阅者实际上是 Kafka.outboundChannelAdapter(producerFactory),第二个订阅者可能是提到的 .enrichHeaders()。如果您什么都不做,最后一个将把它的结果发送到 replyChannel header,因此,将到达一个 HTTP 响应。

在这个 publish-subscribe 场景中,您应该记住,第二个订阅者的 payload 将与您尝试发送到 Kafka 的相同。