Spring 集成 - 优先聚合器
Spring Integration - Priority aggregator
我有以下应用要求:
- 消息从 RabbitMq 接收,然后根据一些更复杂的规则聚合,例如- 基于
types
属性(具有预先给定的类型时间映射)和基于现有时间消息已在队列中等待(old
属性)
- 所有消息都应该以某种可变的消息速率发布,例如1msg/sec 到 100msg/sec。此速率由服务控制和设置,该服务将监视 rabbitmq 队列大小(一个与此组件无关的队列,并且在管道的更上方)并且如果队列中的消息过多 - 将降低速率。
正如您在图片中看到的一个用例:三个消息已经聚合并等待下一秒发布(因为当前速率为 1msg/sec
),但就在那个时候,MSG
与 id:10
一起到达,并更新了 AGGREGATED 2
,使其成为优先级第一的消息。所以在下一个 tick 时,我们没有释放 AGGREGATED 3
,而是释放 AGGREGATED 2
,因为它现在具有更高的优先级。
现在的问题是 - 我可以为此使用 Spring Integration Aggregator,因为我不知道它是否支持在聚合过程中对消息进行优先排序?我知道 groupTimeout
,但那个只是调整单个消息组 - 不改变其他组的优先级。是否可以使用 MessageGroupStoreReaper
在新 MSG 到达时按优先级调整所有其他聚合消息?
更新
我做了一些这样的实现 - 现在看起来还不错 - 它在消息到达时聚合消息,比较器根据我的自定义逻辑对消息进行排序。
您认为这会不会有一些问题(并发等)?我可以在日志中看到,多次调用轮询器。这正常吗?
2021-01-18 13:52:05.277 INFO 16080 --- [ scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
2021-01-18 13:52:05.277 INFO 16080 --- [ scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
2021-01-18 13:52:05.277 INFO 16080 --- [ scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
2021-01-18 13:52:05.277 INFO 16080 --- [ scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
此外,这个注释 doit
方法是在运行时增加最大轮询消息数的正确方法吗?
@Bean
public MessageChannel aggregatingChannel(){
return new QueueChannel(new PriorityAggregatingQueue<>((m1, m2) -> {//aggr here},
Comparator.comparingInt(x -> x),
(m) -> {
ExampleDTO d = (ExampleDTO) m.getPayload();
return d.getId();
}
));
}
class PriorityAggregatingQueue<K> extends AbstractQueue<Message<?>> {
private final Log logger = LogFactory.getLog(getClass());
private final BiFunction<Message<?>, Message<?>, Message<?>> accumulator;
private final Function<Message<?>, K> keyExtractor;
private final NavigableMap<K, Message<?>> keyToAggregatedMessage;
public PriorityAggregatingQueue(BiFunction<Message<?>, Message<?>, Message<?>> accumulator,
Comparator<? super K> comparator,
Function<Message<?>, K> keyExtractor) {
this.accumulator = accumulator;
this.keyExtractor = keyExtractor;
keyToAggregatedMessage = new ConcurrentSkipListMap<>(comparator);
}
@Override
public Iterator<Message<?>> iterator() {
return keyToAggregatedMessage.values().iterator();
}
@Override
public int size() {
return keyToAggregatedMessage.size();
}
@Override
public boolean offer(Message<?> m) {
logger.info("OFFER");
return keyToAggregatedMessage.compute(keyExtractor.apply(m), (k,old) -> accumulator.apply(old, m)) != null;
}
@Override
public Message<?> poll() {
logger.info("POLL");
Map.Entry<K, Message<?>> m = keyToAggregatedMessage.pollLastEntry();
return m != null ? m.getValue() : null;
}
@Override
public Message<?> peek() {
Map.Entry<K, Message<?>> m = keyToAggregatedMessage.lastEntry();
return m!= null ? m.getValue() : null;
}
}
// @Scheduled(fixedDelay = 10*1000)
// public void doit(){
// System.out.println("INCREASE POLL");
// pollerMetadata().setMaxMessagesPerPoll(pollerMetadata().getMaxMessagesPerPoll() * 2);
// }
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata pollerMetadata(){
PollerMetadata metadata = new PollerMetadata();
metadata.setTrigger(new DynamicPeriodicTrigger(Duration.ofSeconds(30)));
metadata.setMaxMessagesPerPoll(1);
return metadata;
}
@Bean
public IntegrationFlow aggregatingFlow(
AmqpInboundChannelAdapter aggregatorInboundChannel,
AmqpOutboundEndpoint aggregatorOutboundChannel,
MessageChannel wtChannel,
MessageChannel aggregatingChannel,
PollerMetadata pollerMetadata
) {
return IntegrationFlows.from(aggregatorInboundChannel)
.wireTap(wtChannel)
.channel(aggregatingChannel)
.handle(aggregatorOutboundChannel)
.get();
}
好吧,如果有新消息要完成,它会到达聚合器,然后会立即发布这样的组(如果您的 ReleaseStrategy
这么说的话)。超时剩余团员继续等待排期
有可能想出智能算法来依靠与 MessageGroupStoreReaper
的单一通用时间表来决定我们是否需要释放该部分组或只是丢弃它。再一次:ReleaseStrategy
应该给我们一个线索来决定是否释放,即使是部分的。当发生丢弃并且我们希望将这些消息保留在聚合器中时,我们需要在一段时间后将它们重新发送回聚合器。过期后,组将从存储中删除,这发生在我们已经发送到丢弃通道时,所以最好延迟它们并让聚合器清理这些组,这样延迟后我们可以安全地将它们发送回新有效期的聚合器作为新组的一部分。
您可能还可以在释放正常组后迭代存储中的所有消息,以调整它们 headers 中的一些时间密钥以用于下一个过期时间。
我知道这很难,但确实没有任何 out-of-the-box 解决方案,因为它的设计目的不是为了影响我们刚刚处理过的其他群体...
我有以下应用要求:
- 消息从 RabbitMq 接收,然后根据一些更复杂的规则聚合,例如- 基于
types
属性(具有预先给定的类型时间映射)和基于现有时间消息已在队列中等待(old
属性) - 所有消息都应该以某种可变的消息速率发布,例如1msg/sec 到 100msg/sec。此速率由服务控制和设置,该服务将监视 rabbitmq 队列大小(一个与此组件无关的队列,并且在管道的更上方)并且如果队列中的消息过多 - 将降低速率。
正如您在图片中看到的一个用例:三个消息已经聚合并等待下一秒发布(因为当前速率为 1msg/sec
),但就在那个时候,MSG
与 id:10
一起到达,并更新了 AGGREGATED 2
,使其成为优先级第一的消息。所以在下一个 tick 时,我们没有释放 AGGREGATED 3
,而是释放 AGGREGATED 2
,因为它现在具有更高的优先级。
现在的问题是 - 我可以为此使用 Spring Integration Aggregator,因为我不知道它是否支持在聚合过程中对消息进行优先排序?我知道 groupTimeout
,但那个只是调整单个消息组 - 不改变其他组的优先级。是否可以使用 MessageGroupStoreReaper
在新 MSG 到达时按优先级调整所有其他聚合消息?
更新
我做了一些这样的实现 - 现在看起来还不错 - 它在消息到达时聚合消息,比较器根据我的自定义逻辑对消息进行排序。
您认为这会不会有一些问题(并发等)?我可以在日志中看到,多次调用轮询器。这正常吗?
2021-01-18 13:52:05.277 INFO 16080 --- [ scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
2021-01-18 13:52:05.277 INFO 16080 --- [ scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
2021-01-18 13:52:05.277 INFO 16080 --- [ scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
2021-01-18 13:52:05.277 INFO 16080 --- [ scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
此外,这个注释 doit
方法是在运行时增加最大轮询消息数的正确方法吗?
@Bean
public MessageChannel aggregatingChannel(){
return new QueueChannel(new PriorityAggregatingQueue<>((m1, m2) -> {//aggr here},
Comparator.comparingInt(x -> x),
(m) -> {
ExampleDTO d = (ExampleDTO) m.getPayload();
return d.getId();
}
));
}
class PriorityAggregatingQueue<K> extends AbstractQueue<Message<?>> {
private final Log logger = LogFactory.getLog(getClass());
private final BiFunction<Message<?>, Message<?>, Message<?>> accumulator;
private final Function<Message<?>, K> keyExtractor;
private final NavigableMap<K, Message<?>> keyToAggregatedMessage;
public PriorityAggregatingQueue(BiFunction<Message<?>, Message<?>, Message<?>> accumulator,
Comparator<? super K> comparator,
Function<Message<?>, K> keyExtractor) {
this.accumulator = accumulator;
this.keyExtractor = keyExtractor;
keyToAggregatedMessage = new ConcurrentSkipListMap<>(comparator);
}
@Override
public Iterator<Message<?>> iterator() {
return keyToAggregatedMessage.values().iterator();
}
@Override
public int size() {
return keyToAggregatedMessage.size();
}
@Override
public boolean offer(Message<?> m) {
logger.info("OFFER");
return keyToAggregatedMessage.compute(keyExtractor.apply(m), (k,old) -> accumulator.apply(old, m)) != null;
}
@Override
public Message<?> poll() {
logger.info("POLL");
Map.Entry<K, Message<?>> m = keyToAggregatedMessage.pollLastEntry();
return m != null ? m.getValue() : null;
}
@Override
public Message<?> peek() {
Map.Entry<K, Message<?>> m = keyToAggregatedMessage.lastEntry();
return m!= null ? m.getValue() : null;
}
}
// @Scheduled(fixedDelay = 10*1000)
// public void doit(){
// System.out.println("INCREASE POLL");
// pollerMetadata().setMaxMessagesPerPoll(pollerMetadata().getMaxMessagesPerPoll() * 2);
// }
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata pollerMetadata(){
PollerMetadata metadata = new PollerMetadata();
metadata.setTrigger(new DynamicPeriodicTrigger(Duration.ofSeconds(30)));
metadata.setMaxMessagesPerPoll(1);
return metadata;
}
@Bean
public IntegrationFlow aggregatingFlow(
AmqpInboundChannelAdapter aggregatorInboundChannel,
AmqpOutboundEndpoint aggregatorOutboundChannel,
MessageChannel wtChannel,
MessageChannel aggregatingChannel,
PollerMetadata pollerMetadata
) {
return IntegrationFlows.from(aggregatorInboundChannel)
.wireTap(wtChannel)
.channel(aggregatingChannel)
.handle(aggregatorOutboundChannel)
.get();
}
好吧,如果有新消息要完成,它会到达聚合器,然后会立即发布这样的组(如果您的 ReleaseStrategy
这么说的话)。超时剩余团员继续等待排期
有可能想出智能算法来依靠与 MessageGroupStoreReaper
的单一通用时间表来决定我们是否需要释放该部分组或只是丢弃它。再一次:ReleaseStrategy
应该给我们一个线索来决定是否释放,即使是部分的。当发生丢弃并且我们希望将这些消息保留在聚合器中时,我们需要在一段时间后将它们重新发送回聚合器。过期后,组将从存储中删除,这发生在我们已经发送到丢弃通道时,所以最好延迟它们并让聚合器清理这些组,这样延迟后我们可以安全地将它们发送回新有效期的聚合器作为新组的一部分。
您可能还可以在释放正常组后迭代存储中的所有消息,以调整它们 headers 中的一些时间密钥以用于下一个过期时间。
我知道这很难,但确实没有任何 out-of-the-box 解决方案,因为它的设计目的不是为了影响我们刚刚处理过的其他群体...