使用 Spring 个集成组件关联 2 个 JMS 队列之间的消息

Correlate messages between 2 JMS queues using Spring integration components

我有 2 个 JMS 队列,我的应用程序使用 Jms.messageDrivenChannelAdapter(...) 组件订阅了它们。

第一个队列接收 Paid 类型的消息。第二个队列接收 Reversal.

类型的消息

业务场景定义类型Paid和类型Reversal的消息之间的关联。

Reversal 应该等待 Paid 才能被处理。

如何通过 Spring 集成实现这样的 "wait" 模式?

是否可以关联 2 个 JMS 队列之间的消息?

the documentation about the Aggregator

聚合器使用某种关联策略关联消息,并根据某种发布策略发布组。

The Aggregator combines a group of related messages, by correlating and storing them, until the group is deemed to be complete. At that point, the aggregator creates a single message by processing the whole group and sends the aggregated message as output.

默认情况下,输出负载是分组消息负载的列表,但您可以提供自定义输出处理器。

编辑

@SpringBootApplication
public class So55299268Application {

    public static void main(String[] args) {
        SpringApplication.run(So55299268Application.class, args);
    }

    @Bean
    public IntegrationFlow in1(ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory)
                    .destination("queue1"))
                .channel("aggregator.input")
                .get();
    }

    @Bean
    public IntegrationFlow in2(ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory)
                    .destination("queue2"))
                .channel("aggregator.input")
                .get();
    }

    @Bean
    public IntegrationFlow aggregator() {
        return f -> f
                .aggregate(a -> a
                        .correlationExpression("headers.jms_correlationId")
                        .releaseExpression("size() == 2")
                        .expireGroupsUponCompletion(true)
                        .expireGroupsUponTimeout(true)
                        .groupTimeout(5_000L)
                        .discardChannel("discards.input"))
                .handle(System.out::println);
    }

    @Bean
    public IntegrationFlow discards() {
        return f -> f.handle((p, h) -> {
            System.out.println("Aggregation timed out for " + p);
            return null;
        });
    }

    @Bean
    public ApplicationRunner runner(JmsTemplate template) {
        return args -> {
            send(template, "one", "two");
            send(template, "three", null);
        };
    }

    private void send(JmsTemplate template, String one, String two) {
        template.convertAndSend("queue1", one, m -> {
            m.setJMSCorrelationID(one);
            return m;
        });
        if (two != null) {
            template.convertAndSend("queue2", two, m -> {
                m.setJMSCorrelationID(one);
                return m;
            });
        }
    }

}

GenericMessage [payload=[two, one], headers={jms_redelivered=false, jms_destination=queue://queue1, jms_correlationId=one, id=784535fe-8861-1b22-2cfa-cc2e67763674, priority=4, jms_timestamp=1553290921442, jms_messageId=ID:Gollum2.local-55540-1553290921241-4:1:3:1:1, timestamp=1553290921457}]

2019-03-22 17:42:06.460 INFO 55396 --- [ask-scheduler-1] o.s.i.a.AggregatingMessageHandler : Expiring MessageGroup with correlationKey[three]

Aggregation timed out for three