Spring 集成 - 优先聚合器

Spring Integration - Priority aggregator

我有以下应用要求:

正如您在图片中看到的一个用例:三个消息已经聚合并等待下一秒发布(因为当前速率为 1msg/sec),但就在那个时候,MSGid:10 一起到达,并更新了 AGGREGATED 2,使其成为优先级第一的消息。所以在下一个 tick 时,我们没有释放 AGGREGATED 3,而是释放 AGGREGATED 2,因为它现在具有更高的优先级。

现在的问题是 - 我可以为此使用 Spring Integration Aggregator,因为我不知道它是否支持在聚合过程中对消息进行优先排序?我知道 groupTimeout,但那个只是调整单个消息组 - 不改变其他组的优先级。是否可以使用 MessageGroupStoreReaper 在新 MSG 到达时按优先级调整所有其他聚合消息?

更新

我做了一些这样的实现 - 现在看起来还不错 - 它在消息到达时聚合消息,比较器根据我的自定义逻辑对消息进行排序。

您认为这会不会有一些问题(并发等)?我可以在日志中看到,多次调用轮询器。这正常吗?

2021-01-18 13:52:05.277  INFO 16080 --- [   scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
2021-01-18 13:52:05.277  INFO 16080 --- [   scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
2021-01-18 13:52:05.277  INFO 16080 --- [   scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
2021-01-18 13:52:05.277  INFO 16080 --- [   scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL

此外,这个注释 doit 方法是在运行时增加最大轮询消息数的正确方法吗?

@Bean
    public MessageChannel aggregatingChannel(){
        return new QueueChannel(new PriorityAggregatingQueue<>((m1, m2) -> {//aggr here},
                Comparator.comparingInt(x -> x),
                (m) -> {
                    ExampleDTO d = (ExampleDTO) m.getPayload();
                    return d.getId();
                }
        ));
    }

    class PriorityAggregatingQueue<K> extends AbstractQueue<Message<?>> {
        private final Log logger = LogFactory.getLog(getClass());
        private final BiFunction<Message<?>, Message<?>, Message<?>> accumulator;
        private final Function<Message<?>, K> keyExtractor;
        private final NavigableMap<K, Message<?>> keyToAggregatedMessage;

        public PriorityAggregatingQueue(BiFunction<Message<?>, Message<?>, Message<?>> accumulator,
                                        Comparator<? super K> comparator,
                                        Function<Message<?>, K> keyExtractor) {
            this.accumulator = accumulator;
            this.keyExtractor = keyExtractor;
            keyToAggregatedMessage = new ConcurrentSkipListMap<>(comparator);
        }

        @Override
        public Iterator<Message<?>> iterator() {
            return keyToAggregatedMessage.values().iterator();
        }

        @Override
        public int size() {
            return keyToAggregatedMessage.size();
        }

        @Override
        public boolean offer(Message<?> m) {
            logger.info("OFFER");
            return keyToAggregatedMessage.compute(keyExtractor.apply(m), (k,old) -> accumulator.apply(old, m)) != null;
        }

        @Override
        public Message<?> poll() {
            logger.info("POLL");
            Map.Entry<K, Message<?>> m = keyToAggregatedMessage.pollLastEntry();
            return m != null ? m.getValue() : null;
        }

        @Override
        public Message<?> peek() {
            Map.Entry<K, Message<?>> m = keyToAggregatedMessage.lastEntry();
            return m!= null ? m.getValue() : null;
        }
    }

//    @Scheduled(fixedDelay = 10*1000)
//    public void doit(){
//        System.out.println("INCREASE POLL");
//        pollerMetadata().setMaxMessagesPerPoll(pollerMetadata().getMaxMessagesPerPoll() * 2);
//    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata pollerMetadata(){
        PollerMetadata metadata = new PollerMetadata();
        metadata.setTrigger(new DynamicPeriodicTrigger(Duration.ofSeconds(30)));
        metadata.setMaxMessagesPerPoll(1);
        return metadata;
    }

    @Bean
    public IntegrationFlow aggregatingFlow(
            AmqpInboundChannelAdapter aggregatorInboundChannel,
            AmqpOutboundEndpoint aggregatorOutboundChannel,
            MessageChannel wtChannel,
            MessageChannel aggregatingChannel,
            PollerMetadata pollerMetadata
    ) {
    return IntegrationFlows.from(aggregatorInboundChannel)
        .wireTap(wtChannel)
        .channel(aggregatingChannel)
        .handle(aggregatorOutboundChannel)
        .get();
    }

好吧,如果有新消息要完成,它会到达聚合器,然后会立即发布这样的组(如果您的 ReleaseStrategy 这么说的话)。超时剩余团员继续等待排期

有可能想出智能算法来依靠与 MessageGroupStoreReaper 的单一通用时间表来决定我们是否需要释放该部分组或只是丢弃它。再一次:ReleaseStrategy 应该给我们一个线索来决定是否释放,即使是部分的。当发生丢弃并且我们希望将这些消息保留在聚合器中时,我们需要在一段时间后将它们重新发送回聚合器。过期后,组将从存储中删除,这发生在我们已经发送到丢弃通道时,所以最好延迟它们并让聚合器清理这些组,这样延迟后我们可以安全地将它们发送回新有效期的聚合器作为新组的一部分。

您可能还可以在释放正常组后迭代存储中的所有消息,以调整它们 headers 中的一些时间密钥以用于下一个过期时间。

我知道这很难,但确实没有任何 out-of-the-box 解决方案,因为它的设计目的不是为了影响我们刚刚处理过的其他群体...