Spring 集成 - 2 个 MessageHandler 处理一半的消息

Spring Integration - 2 MessageHandlers handle half of messages

我正在使用 spring 集成框架,带有 Transformer

inputChannel -> kafka 消费者
outputChannel -> 数据库 jdbc 作者

@Bean
public DirectChannel inboundChannel() {
    return new DirectChannel();
}

   @Bean
    public DirectChannel outboundChannel() {
        return new DirectChannel();
    }

@Bean
@Transformer(inputChannel="inboundChannel", outputChannel="outboundChannel")
public JsonToObjectTransformer jsonToObjectTransformer() {
    return new JsonToObjectTransformer(Item.class);
}




@Bean
@ServiceActivator(inputChannel = "outboundChannel")
public MessageHandler jdbcmessageHandler() {
    JdbcMessageHandler jdbcMessageHandler = new ...

    return ...;
}

@Bean
@ServiceActivator(inputChannel = "inboundChannel")
public MessageHandler kafkahandler() {
    return new ...;
}

在我覆盖的两个处理程序中

public void handleMessage(Message<?> message)

问题:如果在kafka中总共有N条消息, 然后每个 handleMessage() 被调用正好 n/2 次!

我假设每个处理程序将被调用 n 次,因为每个处理程序链接到不同的通道并且总共有 n 条消息。

我错过了什么?

(如果我禁用 kafak 处理程序,第二个处理程序将获取所有 n 条消息)

更新:
我需要订阅者从同一个频道获取所有消息(kafka 处理程序将对原始数据做一些事情,jdbc 处理程序将推送转换后的数据 数据)

首先,您的 inboundChanneloutboundChannel 已不再使用:您无处(至少在问题中)指定了它们的名称。

inputoutput 这样的名称被框架采用并用于创建新的MessageChannel bean,这些bean 在其他地方使用。

现在看看你有什么:

@Transformer(inputChannel="input"
@ServiceActivator(inputChannel = "input")

他们都是同一个 input 频道的订阅者,因为它是由框架自动创建的 DirectChannel。此通道基于循环 LoadBalancingStrategy,因此您在 Kafka 中看到 n/2,因为它的服务激活器仅处理发送到该 input 通道的每秒消息。

请在文档中查看更多信息:https://docs.spring.io/spring-integration/reference/html/core.html#channel-configuration-directchannel