Spring 集成 - 2 个 MessageHandler 处理一半的消息
Spring Integration - 2 MessageHandlers handle half of messages
我正在使用 spring 集成框架,带有 Transformer
inputChannel -> kafka 消费者
outputChannel -> 数据库 jdbc 作者
@Bean
public DirectChannel inboundChannel() {
return new DirectChannel();
}
@Bean
public DirectChannel outboundChannel() {
return new DirectChannel();
}
@Bean
@Transformer(inputChannel="inboundChannel", outputChannel="outboundChannel")
public JsonToObjectTransformer jsonToObjectTransformer() {
return new JsonToObjectTransformer(Item.class);
}
@Bean
@ServiceActivator(inputChannel = "outboundChannel")
public MessageHandler jdbcmessageHandler() {
JdbcMessageHandler jdbcMessageHandler = new ...
return ...;
}
@Bean
@ServiceActivator(inputChannel = "inboundChannel")
public MessageHandler kafkahandler() {
return new ...;
}
在我覆盖的两个处理程序中
public void handleMessage(Message<?> message)
问题:如果在kafka中总共有N条消息,
然后每个 handleMessage() 被调用正好 n/2 次!
我假设每个处理程序将被调用 n 次,因为每个处理程序链接到不同的通道并且总共有 n 条消息。
我错过了什么?
(如果我禁用 kafak 处理程序,第二个处理程序将获取所有 n 条消息)
更新:
我需要订阅者从同一个频道获取所有消息(kafka 处理程序将对原始数据做一些事情,jdbc 处理程序将推送转换后的数据
数据)
首先,您的 inboundChannel
和 outboundChannel
已不再使用:您无处(至少在问题中)指定了它们的名称。
像input
和output
这样的名称被框架采用并用于创建新的MessageChannel
bean,这些bean 在其他地方使用。
现在看看你有什么:
@Transformer(inputChannel="input"
@ServiceActivator(inputChannel = "input")
他们都是同一个 input
频道的订阅者,因为它是由框架自动创建的 DirectChannel
。此通道基于循环 LoadBalancingStrategy
,因此您在 Kafka 中看到 n/2
,因为它的服务激活器仅处理发送到该 input
通道的每秒消息。
请在文档中查看更多信息:https://docs.spring.io/spring-integration/reference/html/core.html#channel-configuration-directchannel
我正在使用 spring 集成框架,带有 Transformer
inputChannel -> kafka 消费者
outputChannel -> 数据库 jdbc 作者
@Bean
public DirectChannel inboundChannel() {
return new DirectChannel();
}
@Bean
public DirectChannel outboundChannel() {
return new DirectChannel();
}
@Bean
@Transformer(inputChannel="inboundChannel", outputChannel="outboundChannel")
public JsonToObjectTransformer jsonToObjectTransformer() {
return new JsonToObjectTransformer(Item.class);
}
@Bean
@ServiceActivator(inputChannel = "outboundChannel")
public MessageHandler jdbcmessageHandler() {
JdbcMessageHandler jdbcMessageHandler = new ...
return ...;
}
@Bean
@ServiceActivator(inputChannel = "inboundChannel")
public MessageHandler kafkahandler() {
return new ...;
}
在我覆盖的两个处理程序中
public void handleMessage(Message<?> message)
问题:如果在kafka中总共有N条消息, 然后每个 handleMessage() 被调用正好 n/2 次!
我假设每个处理程序将被调用 n 次,因为每个处理程序链接到不同的通道并且总共有 n 条消息。
我错过了什么?
(如果我禁用 kafak 处理程序,第二个处理程序将获取所有 n 条消息)
更新:
我需要订阅者从同一个频道获取所有消息(kafka 处理程序将对原始数据做一些事情,jdbc 处理程序将推送转换后的数据
数据)
首先,您的 inboundChannel
和 outboundChannel
已不再使用:您无处(至少在问题中)指定了它们的名称。
像input
和output
这样的名称被框架采用并用于创建新的MessageChannel
bean,这些bean 在其他地方使用。
现在看看你有什么:
@Transformer(inputChannel="input"
@ServiceActivator(inputChannel = "input")
他们都是同一个 input
频道的订阅者,因为它是由框架自动创建的 DirectChannel
。此通道基于循环 LoadBalancingStrategy
,因此您在 Kafka 中看到 n/2
,因为它的服务激活器仅处理发送到该 input
通道的每秒消息。
请在文档中查看更多信息:https://docs.spring.io/spring-integration/reference/html/core.html#channel-configuration-directchannel