Spring 集成中用于持久传送到 AMQP 代理的消息存储
Message store for persistence delivering to AMQP broker in Spring Integration
我正在尝试构建集成流,这将防止在传递到 AMQP 代理 (rabbitMQ) 期间丢失消息。
在代理停止的情况下,我看到一些意想不到的行为:
- 失败的消息正在保存到消息存储中,但不会保存很长时间。此流程不等待代理可用,即使代理仍处于停止状态,它也会从消息存储中提取消息
- 如果 rabbitmq 成功重启,来自消息存储的记录(如果它们仍然存在)不会被传送到队列。
请帮助我进行调查。代码示例:
@Bean
public MessageChannel messageStoreBackedChannel() {
return new QueueChannel(
new MessageGroupQueue(jdbcChannelMessageStore(), "Group_ID")
);
}
@Bean
public IntegrationFlow someFlow() {
return IntegrationFlows
.from("messageStoreBackedChannel")
.channel("amqpMessageChannel")
.get();
}
@Bean
public IntegrationFlow jmsExtractFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlows
.from("amqpMessageChannel")
.handle(message -> System.out.println(message.getPayload()))
.get();
}
@Bean
public MessageChannel amqpMessageChannel() {
return new PollableAmqpChannel("amqpMessageChannel", amqpTemplate);
}
@Bean
public JdbcChannelMessageStore jdbcChannelMessageStore() {
var jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
return jdbcChannelMessageStore;
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(10));
return pollerMetadata;
}
考虑在 .from("messageStoreBackedChannel").channel("amqpMessageChannel")
和 transactional()
之间配置一个端点。
像这样:
.from("messageStoreBackedChannel")
.bridge(e -> e.poller(p -> p.fixedDelay(10).transactional()))
.channel("amqpMessageChannel")
因此,每当传送到 amqpMessageChannel
失败时,事务将回滚并且失败的消息将返回到存储直到下一次轮询。
当然,当您连接到 RabbitMQ 时遇到错误,您可以停止 bridge
端点。但是您如何确定连接恢复了呢?..
我正在尝试构建集成流,这将防止在传递到 AMQP 代理 (rabbitMQ) 期间丢失消息。 在代理停止的情况下,我看到一些意想不到的行为:
- 失败的消息正在保存到消息存储中,但不会保存很长时间。此流程不等待代理可用,即使代理仍处于停止状态,它也会从消息存储中提取消息
- 如果 rabbitmq 成功重启,来自消息存储的记录(如果它们仍然存在)不会被传送到队列。
请帮助我进行调查。代码示例:
@Bean
public MessageChannel messageStoreBackedChannel() {
return new QueueChannel(
new MessageGroupQueue(jdbcChannelMessageStore(), "Group_ID")
);
}
@Bean
public IntegrationFlow someFlow() {
return IntegrationFlows
.from("messageStoreBackedChannel")
.channel("amqpMessageChannel")
.get();
}
@Bean
public IntegrationFlow jmsExtractFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlows
.from("amqpMessageChannel")
.handle(message -> System.out.println(message.getPayload()))
.get();
}
@Bean
public MessageChannel amqpMessageChannel() {
return new PollableAmqpChannel("amqpMessageChannel", amqpTemplate);
}
@Bean
public JdbcChannelMessageStore jdbcChannelMessageStore() {
var jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
return jdbcChannelMessageStore;
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(10));
return pollerMetadata;
}
考虑在 .from("messageStoreBackedChannel").channel("amqpMessageChannel")
和 transactional()
之间配置一个端点。
像这样:
.from("messageStoreBackedChannel")
.bridge(e -> e.poller(p -> p.fixedDelay(10).transactional()))
.channel("amqpMessageChannel")
因此,每当传送到 amqpMessageChannel
失败时,事务将回滚并且失败的消息将返回到存储直到下一次轮询。
当然,当您连接到 RabbitMQ 时遇到错误,您可以停止 bridge
端点。但是您如何确定连接恢复了呢?..