集成模式:如何同步处理从多个系统接收到的消息
Integration pattern : how to sync processing message received from multiple systems
我正在构建一个系统,该系统将通过消息代理(目前为 JMS)从不同系统接收消息。来自所有发送方系统的所有消息都有一个 deviceId,消息的接收没有顺序。
例如,系统 A 可以发送 deviceId=1 的消息,系统 b 可以发送 deviceId=2 的消息。
我的目标是不开始处理关于同一个 deviceId 的消息,除非我从具有相同 deviceId 的所有发件人那里收到所有消息。
例如,如果我有 3 个系统 A、B 和 C 向我的系统发送消息:
System A sends messageA1 with deviceId=1
System B sends messageB1 with deviceId=1
System C sends messageC1 with deviceId=3
System C sends messageC2 with deviceId=1 <--- here I should start processing of messageA1, messageB1 and messageC2 because they are having the same deviceID 1.
是否应该通过在我的系统中使用某种同步机制、消息代理或像 spring-integration/apache camel 这样的集成框架来解决这个问题?
您可以使用缓存组件在 Apache Camel 中执行此操作。我认为有 EHCache 组件。
本质上:
- 您收到一条带有给定 deviceId 的消息说 deviceId1。
- 您在缓存中查找已收到 deviceId1 的哪些消息。
- 只要您还没有收到所有三个,您就会将当前 system/message 添加到缓存中。
- 一旦所有消息都存在,您就可以处理并清除缓存。
然后您可以将每条传入消息路由到特定的基于 deviceId 的队列进行临时存储。这可以是 JMS、ActiveMQ 或类似的东西。
Spring Integration 为这类任务提供组件 - 在收集整个组之前不要发出。它的名字是 Aggregator。你的deviceId
绝对是correlationKey
。 releaseStrategy
实际上可能基于系统的数量 - 在继续下一步之前您等待了多少 deviceId1
消息。
聚合器的类似解决方案(@Artem Bilan 提到的)也可以在 Camel 中使用自定义 AggregationStrategy
实现,并通过使用 Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP
[=26= 控制聚合器完成].
以下内容可能是一个很好的起点。 (You can find the sample project with tests here)
路线:
from("direct:start")
.log(LoggingLevel.INFO, "Received ${headers.system}${headers.deviceId}")
.aggregate(header("deviceId"), new SignalAggregationStrategy(3))
.log(LoggingLevel.INFO, "Signaled body: ${body}")
.to("direct:result");
SignalAggregationStrategy.java
public class SignalAggregationStrategy extends GroupedExchangeAggregationStrategy implements Predicate {
private int numberOfSystems;
public SignalAggregationStrategy(int numberOfSystems) {
this.numberOfSystems = numberOfSystems;
}
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Exchange exchange = super.aggregate(oldExchange, newExchange);
List<Exchange> aggregatedExchanges = exchange.getProperty("CamelGroupedExchange", List.class);
// Complete aggregation if we have "numberOfSystems" (currently 3) different messages (where "system" headers are different)
// https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#completing-current-group-decided-from-the-aggregationstrategy
if (numberOfSystems == aggregatedExchanges.stream().map(e -> e.getIn().getHeader("system", String.class)).distinct().count()) {
exchange.setProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, true);
}
return exchange;
}
@Override
public boolean matches(Exchange exchange) {
// make it infinite (4th bullet point @ https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#about-completion)
return false;
}
}
希望对您有所帮助!
我正在构建一个系统,该系统将通过消息代理(目前为 JMS)从不同系统接收消息。来自所有发送方系统的所有消息都有一个 deviceId,消息的接收没有顺序。 例如,系统 A 可以发送 deviceId=1 的消息,系统 b 可以发送 deviceId=2 的消息。
我的目标是不开始处理关于同一个 deviceId 的消息,除非我从具有相同 deviceId 的所有发件人那里收到所有消息。
例如,如果我有 3 个系统 A、B 和 C 向我的系统发送消息:
System A sends messageA1 with deviceId=1
System B sends messageB1 with deviceId=1
System C sends messageC1 with deviceId=3
System C sends messageC2 with deviceId=1 <--- here I should start processing of messageA1, messageB1 and messageC2 because they are having the same deviceID 1.
是否应该通过在我的系统中使用某种同步机制、消息代理或像 spring-integration/apache camel 这样的集成框架来解决这个问题?
您可以使用缓存组件在 Apache Camel 中执行此操作。我认为有 EHCache 组件。
本质上:
- 您收到一条带有给定 deviceId 的消息说 deviceId1。
- 您在缓存中查找已收到 deviceId1 的哪些消息。
- 只要您还没有收到所有三个,您就会将当前 system/message 添加到缓存中。
- 一旦所有消息都存在,您就可以处理并清除缓存。
然后您可以将每条传入消息路由到特定的基于 deviceId 的队列进行临时存储。这可以是 JMS、ActiveMQ 或类似的东西。
Spring Integration 为这类任务提供组件 - 在收集整个组之前不要发出。它的名字是 Aggregator。你的deviceId
绝对是correlationKey
。 releaseStrategy
实际上可能基于系统的数量 - 在继续下一步之前您等待了多少 deviceId1
消息。
聚合器的类似解决方案(@Artem Bilan 提到的)也可以在 Camel 中使用自定义 AggregationStrategy
实现,并通过使用 Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP
[=26= 控制聚合器完成].
以下内容可能是一个很好的起点。 (You can find the sample project with tests here)
路线:
from("direct:start")
.log(LoggingLevel.INFO, "Received ${headers.system}${headers.deviceId}")
.aggregate(header("deviceId"), new SignalAggregationStrategy(3))
.log(LoggingLevel.INFO, "Signaled body: ${body}")
.to("direct:result");
SignalAggregationStrategy.java
public class SignalAggregationStrategy extends GroupedExchangeAggregationStrategy implements Predicate {
private int numberOfSystems;
public SignalAggregationStrategy(int numberOfSystems) {
this.numberOfSystems = numberOfSystems;
}
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Exchange exchange = super.aggregate(oldExchange, newExchange);
List<Exchange> aggregatedExchanges = exchange.getProperty("CamelGroupedExchange", List.class);
// Complete aggregation if we have "numberOfSystems" (currently 3) different messages (where "system" headers are different)
// https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#completing-current-group-decided-from-the-aggregationstrategy
if (numberOfSystems == aggregatedExchanges.stream().map(e -> e.getIn().getHeader("system", String.class)).distinct().count()) {
exchange.setProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, true);
}
return exchange;
}
@Override
public boolean matches(Exchange exchange) {
// make it infinite (4th bullet point @ https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#about-completion)
return false;
}
}
希望对您有所帮助!