使用 Spring 集成防止消息被多次轮询?

Prevent a message to be polled more than once using Spring Integration?

我正在使用 Spring 集成和 JpaPollingChannelAdapter 从数据库中轮询具有特定状态的条目。如果找到条目,则会处理这些条目。

请问有什么办法可以避免一个条目被轮询两次?

我尝试使用 JpaOutboundGateway 来更新 JPA 实体上的状态,但是如果轮询频率很高,则该条目在获取之前会被多次轮询已更新。

这是我的配置:

@Bean
public MessageChannel defaultInputChannel() {
    return new DirectChannel();
}

@Bean
public JpaExecutor jpaFindAllExecutor() {
    JpaExecutor jpaExecutor = new JpaExecutor(this.entityManagerFactory);
    jpaExecutor.setNamedQuery("SELECT o FROM MyEntity o WHERE o.status=0");
    return jpaExecutor;
}

@Bean
@InboundChannelAdapter(poller = @Poller(maxMessagesPerPoll = "${poller.maxMessagesPerPoll}", fixedDelay = "${poller.interval}", receiveTimeout = "${poller.receiveTimeout}"))
public MessageSource<?> pollingSource() {
    return new JpaPollingChannelAdapter(jpaFindAllExecutor());
}

JpaExecutor 附带一个选项,例如 deleteAfterPoll:

https://docs.spring.io/spring-integration/docs/current/reference/html/jpa.html#jpa-inbound-channel-adapter

Set this value to true if you want to delete the rows received after execution of the query. You must ensure that the component operates as part of a transaction.

如果你不喜欢DELETE。而是 UPDATE,考虑在您的实体上使用相应的 Hibernate 注释:

@SQLDelete(sql = "Update MyEntity set status = 1 where id = ?")

根据您的 @InboundChannelAdapterJpaOutboundGateway 应该也能正常工作。 您可能将处理从一个线程转移到另一个线程。只要您不离开轮询线程,就不会再轮询更多数据。