Spring 集成:仅在处理完上次轮询的所有结果后才启动 JPA 轮询
Spring Integration: start JPA polling only when all the results of last polling has been processed
我想使用 Spring 集成 Java DSL 来实现以下流程:
- 每 2 小时在数据库中轮询一个 table 需要处理的文档的 returns id
- 对于每个 id,通过 HTTP 网关处理一个文档
- 将响应存储在数据库中
我有一个有效的 Java 代码可以执行这些步骤。我正在努力解决的另一个要求是,在处理完上次轮询的所有文档并将其存储在数据库中之前,不应该对下一轮文档进行轮询。
Spring 集成中是否有任何模式可用于满足此附加要求?
这是一个简化的代码 - 它会变得更加复杂,我会将文档的处理(HTTP 出站和持久化)拆分为单独的 类 / 流程:
return IntegrationFlows.from(Jpa.inboundAdapter(this.targetEntityManagerFactory)
.entityClass(ProcessingMetadata.class)
.jpaQuery("select max(p.modifiedDate) from ProcessingMetadata p " +
"where p.status = com.test.ProcessingStatus.PROCESSED")
.maxResults(1)
.expectSingleResult(true),
e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10))))
.handle(Jpa.retrievingGateway(this.sourceEntityManagerFactory)
.entityClass(DocumentHeader.class)
.jpaQuery("from DocumentHeader d where d.modified > :modified")
.parameterExpression("modified", "payload"))
.handle(Http.outboundGateway(uri)
.httpMethod(HttpMethod.POST)
.expectedResponseType(String.class))
.handle(Jpa.outboundAdapter(this.targetEntityManagerFactory)
.entityClass(ProcessingMetadata.class)
.persistMode(PersistMode.PERSIST),
e -> e.transactional(true))
.get();
更新
按照 Artem 的建议,我正在尝试使用 SimpleActiveIdleMessageSourceAdvice
来实现它
class WaitUntilCompleted extends SimpleActiveIdleMessageSourceAdvice {
public WaitUntilCompleted(DynamicPeriodicTrigger trigger) {
super(trigger);
}
@Override
public boolean beforeReceive(MessageSource<?> source) {
return false;
}
}
如果我理解正确的话,上面的代码会停止轮询。现在我不知道如何将这个 Advice 附加到 Jpa.inboundAdapter... 它似乎没有合适的方法(既不是 Advice 也不是 Spec Handler)。我在这里错过了一些明显的东西吗?我已经尝试将建议附加到 Jpa.retrievingGateway,但它根本不会改变流程。
更新2
检查这个问题以获得完整的解决方案:
我今天回答了类似的问题:。
你也可能在数据库级别上有一个技巧,不要让 table 中的新记录被锁定,而其他记录被锁定。或者您可以在流程末尾有一些 UPDATE
,而您的 SELECT
在它们分别更新之前不会看到适当的记录。
但无论如何,我针对该问题建议的任何方法也应该适用于此。
此外,您确实可以考虑依赖 SimpleActiveIdleMessageSourceAdvice
,因为您的解决方案已经基于 MessageSource
实现。
更新
对于您的用例,最好扩展 SimpleActiveIdleMessageSourceAdvice
并覆盖其 beforeReceive()
以检查您是否能够读取更多数据的某些状态。 idlePollPeriod
和 activePollPeriod
可能是相同的值:在两者之间更改它看起来没有意义,因为您将在读取下一组数据后立即进入空闲状态。
要检查状态,它实际上可能是一个简单的 AtomicBoolean
bean,您应该在处理当前文档集后更改它。这可能是在聚合器之后的东西或您可以在解决方案中使用的任何其他东西。
更新 2
要为您的 Jpa.inboundAdapter
使用 WaitUntilCompleted
,您应该有这样的配置:
IntegrationFlows.from(Jpa.inboundAdapter(this.targetEntityManagerFactory)
.entityClass(ProcessingMetadata.class)
.jpaQuery("select max(p.modifiedDate) from ProcessingMetadata p " +
"where p.status = com.test.ProcessingStatus.PROCESSED")
.maxResults(1)
.expectSingleResult(true),
e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10)).advice(waitUntilCompleted())))
注意 .advice(waitUntilCompleted())
,它是 pller 配置的一部分,指向您的建议 bean。
我想使用 Spring 集成 Java DSL 来实现以下流程:
- 每 2 小时在数据库中轮询一个 table 需要处理的文档的 returns id
- 对于每个 id,通过 HTTP 网关处理一个文档
- 将响应存储在数据库中
我有一个有效的 Java 代码可以执行这些步骤。我正在努力解决的另一个要求是,在处理完上次轮询的所有文档并将其存储在数据库中之前,不应该对下一轮文档进行轮询。
Spring 集成中是否有任何模式可用于满足此附加要求?
这是一个简化的代码 - 它会变得更加复杂,我会将文档的处理(HTTP 出站和持久化)拆分为单独的 类 / 流程:
return IntegrationFlows.from(Jpa.inboundAdapter(this.targetEntityManagerFactory)
.entityClass(ProcessingMetadata.class)
.jpaQuery("select max(p.modifiedDate) from ProcessingMetadata p " +
"where p.status = com.test.ProcessingStatus.PROCESSED")
.maxResults(1)
.expectSingleResult(true),
e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10))))
.handle(Jpa.retrievingGateway(this.sourceEntityManagerFactory)
.entityClass(DocumentHeader.class)
.jpaQuery("from DocumentHeader d where d.modified > :modified")
.parameterExpression("modified", "payload"))
.handle(Http.outboundGateway(uri)
.httpMethod(HttpMethod.POST)
.expectedResponseType(String.class))
.handle(Jpa.outboundAdapter(this.targetEntityManagerFactory)
.entityClass(ProcessingMetadata.class)
.persistMode(PersistMode.PERSIST),
e -> e.transactional(true))
.get();
更新
按照 Artem 的建议,我正在尝试使用 SimpleActiveIdleMessageSourceAdvice
来实现它class WaitUntilCompleted extends SimpleActiveIdleMessageSourceAdvice {
public WaitUntilCompleted(DynamicPeriodicTrigger trigger) {
super(trigger);
}
@Override
public boolean beforeReceive(MessageSource<?> source) {
return false;
}
}
如果我理解正确的话,上面的代码会停止轮询。现在我不知道如何将这个 Advice 附加到 Jpa.inboundAdapter... 它似乎没有合适的方法(既不是 Advice 也不是 Spec Handler)。我在这里错过了一些明显的东西吗?我已经尝试将建议附加到 Jpa.retrievingGateway,但它根本不会改变流程。
更新2
检查这个问题以获得完整的解决方案:
我今天回答了类似的问题:
你也可能在数据库级别上有一个技巧,不要让 table 中的新记录被锁定,而其他记录被锁定。或者您可以在流程末尾有一些 UPDATE
,而您的 SELECT
在它们分别更新之前不会看到适当的记录。
但无论如何,我针对该问题建议的任何方法也应该适用于此。
此外,您确实可以考虑依赖 SimpleActiveIdleMessageSourceAdvice
,因为您的解决方案已经基于 MessageSource
实现。
更新
对于您的用例,最好扩展 SimpleActiveIdleMessageSourceAdvice
并覆盖其 beforeReceive()
以检查您是否能够读取更多数据的某些状态。 idlePollPeriod
和 activePollPeriod
可能是相同的值:在两者之间更改它看起来没有意义,因为您将在读取下一组数据后立即进入空闲状态。
要检查状态,它实际上可能是一个简单的 AtomicBoolean
bean,您应该在处理当前文档集后更改它。这可能是在聚合器之后的东西或您可以在解决方案中使用的任何其他东西。
更新 2
要为您的 Jpa.inboundAdapter
使用 WaitUntilCompleted
,您应该有这样的配置:
IntegrationFlows.from(Jpa.inboundAdapter(this.targetEntityManagerFactory)
.entityClass(ProcessingMetadata.class)
.jpaQuery("select max(p.modifiedDate) from ProcessingMetadata p " +
"where p.status = com.test.ProcessingStatus.PROCESSED")
.maxResults(1)
.expectSingleResult(true),
e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10)).advice(waitUntilCompleted())))
注意 .advice(waitUntilCompleted())
,它是 pller 配置的一部分,指向您的建议 bean。