多线程Executor通道加速消费者进程
Multithreaded Executor channel to speed up the consumer process
我有一个消息生成器,它生成大约 15 个 messages/second
消费者是一个 spring 集成项目,它从消息队列中消费并进行大量处理。目前它是单线程的,无法与生产者发送消息的速率相匹配。因此队列深度不断增加
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(Jms.container(this.emsConnectionFactory, this.emsQueue).get()))
.wireTap(FLTAWARE_WIRE_TAP_CHNL)// push raw fa data
.filter(ingFilter, "filterMessageOnEvent").transform(eventHandler, "parseEvent")
.aggregate(a -> a.correlationStrategy(corStrgy, "getCorrelationKey").releaseStrategy(g -> {
boolean eonExists = g.getMessages().stream()
.anyMatch(eon -> ((FlightModel) eon.getPayload()).getEstGmtOnDtm() != null);
if (eonExists) {
boolean einExists = g.getMessages().stream()
.anyMatch(ein -> ((FlightModel) ein.getPayload()).getEstGmtInDtm() != null);
if (einExists) {
return true;
}
}
return false;
}).messageStore(this.messageStore)).channel("AggregatorEventChannel").get();
是否可以使用执行器通道在多线程环境中处理这个并加速消费者进程
如果是,请建议我如何实现 - 为了确保消息的顺序,我需要将相同类型的消息(基于消息的 id)分配给执行程序通道的同一线程。
[更新代码]
我创建了以下执行程序通道
public static final MessageChannel SKW_DEFAULT_CHANNEL = MessageChannels
.executor(ASQ_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
public static final MessageChannel RPA_DEFAULT_CHANNEL = MessageChannels
.executor(ASH_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
现在我从主消息流重定向到自定义路由器,该路由器将消息转发到执行程序通道,如下所示 -
@Bean
public IntegrationFlow baseEventFlow1() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(Jms.container(this.emsConnectionFactory, this.emsQueue).get()))
.wireTap(FLTAWARE_WIRE_TAP_CHNL)// push raw fa data
.filter(ingFilter, "filterMessageOnEvent").route(route()).get();
}
public AbstractMessageRouter router() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
if (message.getPayload().toString().contains("\"id\":\"RPA")) {
return Collections.singletonList(RPA_DEFAULT_CHANNEL);
} else if (message.getPayload().toString().contains("\"id\":\"SKW")) {
return Collections.singletonList(SKW_DEFAULT_CHANNEL);
} else {
return Collections.singletonList(new NullChannel());
}
}
};
}
我将为相应的执行者通道提供单独的消费者流。
请指正我的理解
[更新]
@Bean
@BridgeTo("uaxDefaultChannel")
public MessageChannel ucaDefaultChannel() {
return MessageChannels.executor(UCA_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
}
@Bean
@BridgeTo("uaDefaultChannel")
public MessageChannel ualDefaultChannel() {
return MessageChannels.executor(UAL_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
}
@Bean
public IntegrationFlow uaEventFlow() {
return IntegrationFlows.from("uaDefaultChannel").wireTap(UA_WIRE_TAP_CHNL)
.transform(eventHandler, "parseEvent")
}
因此执行者通道上的 BridgeTo 将转发消息
hence the queue depth keeps on increasing
因为看起来您的队列在 JMS 代理上的某处,所以确实可以有这样的行为。这正是消息传递系统的设计目的 - 区分生产者和消费者并尽可能在目的地处理消息。
如果你想增加一个来自 JMS 的轮询,你可以考虑在 JMS 容器上有一个 concurrency
选项:
/**
* The concurrency to use.
* @param concurrency the concurrency.
* @return current {@link JmsDefaultListenerContainerSpec}.
* @see DefaultMessageListenerContainer#setConcurrency(String)
*/
public JmsDefaultListenerContainerSpec concurrency(String concurrency) {
this.target.setConcurrency(concurrency);
return this;
}
/**
* The concurrent consumers number to use.
* @param concurrentConsumers the concurrent consumers count.
* @return current {@link JmsDefaultListenerContainerSpec}.
* @see DefaultMessageListenerContainer#setConcurrentConsumers(int)
*/
public JmsDefaultListenerContainerSpec concurrentConsumers(int concurrentConsumers) {
this.target.setConcurrentConsumers(concurrentConsumers);
return this;
}
/**
* The max for concurrent consumers number to use.
* @param maxConcurrentConsumers the max concurrent consumers count.
* @return current {@link JmsDefaultListenerContainerSpec}.
* @see DefaultMessageListenerContainer#setMaxConcurrentConsumers(int)
*/
public JmsDefaultListenerContainerSpec maxConcurrentConsumers(int maxConcurrentConsumers) {
this.target.setMaxConcurrentConsumers(maxConcurrentConsumers);
return this;
}
但这不允许您"asign messages to the specific thread"。在 JMS 中就像没有办法分区一样。
我们可以根据您的 "based on the id of the message" 和配置有单线程 Executor
的特定 ExecutorChannel
实例,通过 Spring 集成使用 router
来做到这一点。每个 ExecutorChannel
都将成为它的专用执行器,只有一个线程。这样您将确保具有相同分区键的消息的顺序,并且您将并行处理它们。所有 ExecutorChannel
可以有相同的订阅者或 bridge
到相同的频道进行处理。
但是您需要记住,当您离开 JMS 侦听器线程时,您完成了 JMS 事务并且无法处理该单独线程中的消息,您可能会丢失一条消息。
我有一个消息生成器,它生成大约 15 个 messages/second
消费者是一个 spring 集成项目,它从消息队列中消费并进行大量处理。目前它是单线程的,无法与生产者发送消息的速率相匹配。因此队列深度不断增加
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(Jms.container(this.emsConnectionFactory, this.emsQueue).get()))
.wireTap(FLTAWARE_WIRE_TAP_CHNL)// push raw fa data
.filter(ingFilter, "filterMessageOnEvent").transform(eventHandler, "parseEvent")
.aggregate(a -> a.correlationStrategy(corStrgy, "getCorrelationKey").releaseStrategy(g -> {
boolean eonExists = g.getMessages().stream()
.anyMatch(eon -> ((FlightModel) eon.getPayload()).getEstGmtOnDtm() != null);
if (eonExists) {
boolean einExists = g.getMessages().stream()
.anyMatch(ein -> ((FlightModel) ein.getPayload()).getEstGmtInDtm() != null);
if (einExists) {
return true;
}
}
return false;
}).messageStore(this.messageStore)).channel("AggregatorEventChannel").get();
是否可以使用执行器通道在多线程环境中处理这个并加速消费者进程
如果是,请建议我如何实现 - 为了确保消息的顺序,我需要将相同类型的消息(基于消息的 id)分配给执行程序通道的同一线程。
[更新代码] 我创建了以下执行程序通道
public static final MessageChannel SKW_DEFAULT_CHANNEL = MessageChannels
.executor(ASQ_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
public static final MessageChannel RPA_DEFAULT_CHANNEL = MessageChannels
.executor(ASH_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
现在我从主消息流重定向到自定义路由器,该路由器将消息转发到执行程序通道,如下所示 -
@Bean
public IntegrationFlow baseEventFlow1() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(Jms.container(this.emsConnectionFactory, this.emsQueue).get()))
.wireTap(FLTAWARE_WIRE_TAP_CHNL)// push raw fa data
.filter(ingFilter, "filterMessageOnEvent").route(route()).get();
}
public AbstractMessageRouter router() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
if (message.getPayload().toString().contains("\"id\":\"RPA")) {
return Collections.singletonList(RPA_DEFAULT_CHANNEL);
} else if (message.getPayload().toString().contains("\"id\":\"SKW")) {
return Collections.singletonList(SKW_DEFAULT_CHANNEL);
} else {
return Collections.singletonList(new NullChannel());
}
}
};
}
我将为相应的执行者通道提供单独的消费者流。
请指正我的理解
[更新]
@Bean
@BridgeTo("uaxDefaultChannel")
public MessageChannel ucaDefaultChannel() {
return MessageChannels.executor(UCA_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
}
@Bean
@BridgeTo("uaDefaultChannel")
public MessageChannel ualDefaultChannel() {
return MessageChannels.executor(UAL_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
}
@Bean
public IntegrationFlow uaEventFlow() {
return IntegrationFlows.from("uaDefaultChannel").wireTap(UA_WIRE_TAP_CHNL)
.transform(eventHandler, "parseEvent")
}
因此执行者通道上的 BridgeTo 将转发消息
hence the queue depth keeps on increasing
因为看起来您的队列在 JMS 代理上的某处,所以确实可以有这样的行为。这正是消息传递系统的设计目的 - 区分生产者和消费者并尽可能在目的地处理消息。
如果你想增加一个来自 JMS 的轮询,你可以考虑在 JMS 容器上有一个 concurrency
选项:
/**
* The concurrency to use.
* @param concurrency the concurrency.
* @return current {@link JmsDefaultListenerContainerSpec}.
* @see DefaultMessageListenerContainer#setConcurrency(String)
*/
public JmsDefaultListenerContainerSpec concurrency(String concurrency) {
this.target.setConcurrency(concurrency);
return this;
}
/**
* The concurrent consumers number to use.
* @param concurrentConsumers the concurrent consumers count.
* @return current {@link JmsDefaultListenerContainerSpec}.
* @see DefaultMessageListenerContainer#setConcurrentConsumers(int)
*/
public JmsDefaultListenerContainerSpec concurrentConsumers(int concurrentConsumers) {
this.target.setConcurrentConsumers(concurrentConsumers);
return this;
}
/**
* The max for concurrent consumers number to use.
* @param maxConcurrentConsumers the max concurrent consumers count.
* @return current {@link JmsDefaultListenerContainerSpec}.
* @see DefaultMessageListenerContainer#setMaxConcurrentConsumers(int)
*/
public JmsDefaultListenerContainerSpec maxConcurrentConsumers(int maxConcurrentConsumers) {
this.target.setMaxConcurrentConsumers(maxConcurrentConsumers);
return this;
}
但这不允许您"asign messages to the specific thread"。在 JMS 中就像没有办法分区一样。
我们可以根据您的 "based on the id of the message" 和配置有单线程 Executor
的特定 ExecutorChannel
实例,通过 Spring 集成使用 router
来做到这一点。每个 ExecutorChannel
都将成为它的专用执行器,只有一个线程。这样您将确保具有相同分区键的消息的顺序,并且您将并行处理它们。所有 ExecutorChannel
可以有相同的订阅者或 bridge
到相同的频道进行处理。
但是您需要记住,当您离开 JMS 侦听器线程时,您完成了 JMS 事务并且无法处理该单独线程中的消息,您可能会丢失一条消息。