Spring 集成中用于持久传送到 AMQP 代理的消息存储

Message store for persistence delivering to AMQP broker in Spring Integration

我正在尝试构建集成流,这将防止在传递到 AMQP 代理 (rabbitMQ) 期间丢失消息。 在代理停止的情况下,我看到一些意想不到的行为:

  1. 失败的消息正在保存到消息存储中,但不会保存很长时间。此流程不等待代理可用,即使代理仍处于停止状态,它也会从消息存储中提取消息
  2. 如果 rabbitmq 成功重启,来自消息存储的记录(如果它们仍然存在)不会被传送到队列。

请帮助我进行调查。代码示例:

 @Bean
public MessageChannel messageStoreBackedChannel() {
    return new QueueChannel(
            new MessageGroupQueue(jdbcChannelMessageStore(), "Group_ID")
    );
}

 @Bean
public IntegrationFlow someFlow() {
    return IntegrationFlows
            .from("messageStoreBackedChannel")
            .channel("amqpMessageChannel")
            .get();
}

@Bean
public IntegrationFlow jmsExtractFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlows
            .from("amqpMessageChannel")
            .handle(message -> System.out.println(message.getPayload()))
            .get();
}


@Bean
public MessageChannel amqpMessageChannel() {
    return new PollableAmqpChannel("amqpMessageChannel", amqpTemplate);
}

@Bean
public JdbcChannelMessageStore jdbcChannelMessageStore() {
    var jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
    jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());

    return jdbcChannelMessageStore;
}

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setTrigger(new PeriodicTrigger(10));
    return pollerMetadata;
}

考虑在 .from("messageStoreBackedChannel").channel("amqpMessageChannel")transactional() 之间配置一个端点。

像这样:

.from("messageStoreBackedChannel")
.bridge(e -> e.poller(p -> p.fixedDelay(10).transactional()))
.channel("amqpMessageChannel")

因此,每当传送到 amqpMessageChannel 失败时,事务将回滚并且失败的消息将返回到存储直到下一次轮询。

当然,当您连接到 RabbitMQ 时遇到错误,您可以停止 bridge 端点。但是您如何确定连接恢复了呢?..