MessageSource 作为流程步骤
MessageSource as a flow step
我正在使用 SI-DSL 编写 SI 流程,所以让我先说我不知道这个问题是只与 Si-DSL 相关还是与 SI 和 SI-DSL 都相关。
我的用例是这样的
- 从队列中获取消息
- 将消息保存在数据库中 table
- 通过选择某些特定状态的消息来检索这些消息
在未来的某个时刻
- 进一步处理消息...
我的问题是第三步。如果第 3 步是第 1 步,那将很容易,因为我可以只使用 JdbcPollingChannelAdapter 作为 MessageSource。但是,我找不到在流程中间使用它的方法。所以,在 DSL 术语中,我可以做到(其中 dbDataMessageSource 是 JdbcPollingChannelAdapter)
IntegrationFlows
.from(dbDataMessageSource(), p -> p.poller(Pollers.fixedRate(24, TimeUnit.HOURS)))
但是我做不到
IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(...))
.handle(new JdbcOutboundGateway(...)
.handle(dbDataMessageSource(), p -> p.poller(Pollers.fixedRate(24, TimeUnit.HOURS)))
我尝试使用网关、网桥、handleWithAdapter 而不是“.handle”,但注意到有效。
有什么想法吗?
干杯。
不知道为什么每个人都认为流必须只声明为单个 IntegrationFlow
对象,但没有人阻止你将它分成几个并连接到通道。一个可以以 .channel("foo")
结束,另一个可以以 IntegraitonFlows.from("foo")
开始。虽然看起来您可以省略该通道并将两个流连接成一个。
IntegrationFlow
只是让您最大限度地减少代码数量并真正省略一些样板代码片段。但是如果你的逻辑需要channel,或者有几个phase,分出来享受结果就好了。
所以,你的逻辑是这样的:
- 从队列中读取并存储在数据库中。是一个
IntegrationFlow
- 使用
dbDataMessageSource()
轮询数据库并进行处理。另一个独立的流程。
请阅读有关核心 Spring 集成的更多信息。 Java DSL 只是试图简化配置,但它仍然遵循与 SI Core 相同的 channel -> endpoint -> handler
原则。
好吧,我找到了按照我提到的方式执行此操作的方法。我可以重写 JdbcOutboundGateway,它包括 JdbcMessageHandler 和 JdbcPollingChannelAdapter,并使最后一个可配置,这样我就可以按照我想要的方式设置轮询时间和输出通道。
有点不可靠,但应该可以。
干杯。
我正在使用 SI-DSL 编写 SI 流程,所以让我先说我不知道这个问题是只与 Si-DSL 相关还是与 SI 和 SI-DSL 都相关。
我的用例是这样的
- 从队列中获取消息
- 将消息保存在数据库中 table
- 通过选择某些特定状态的消息来检索这些消息
在未来的某个时刻
- 进一步处理消息...
我的问题是第三步。如果第 3 步是第 1 步,那将很容易,因为我可以只使用 JdbcPollingChannelAdapter 作为 MessageSource。但是,我找不到在流程中间使用它的方法。所以,在 DSL 术语中,我可以做到(其中 dbDataMessageSource 是 JdbcPollingChannelAdapter)
IntegrationFlows
.from(dbDataMessageSource(), p -> p.poller(Pollers.fixedRate(24, TimeUnit.HOURS)))
但是我做不到
IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(...))
.handle(new JdbcOutboundGateway(...)
.handle(dbDataMessageSource(), p -> p.poller(Pollers.fixedRate(24, TimeUnit.HOURS)))
我尝试使用网关、网桥、handleWithAdapter 而不是“.handle”,但注意到有效。
有什么想法吗?
干杯。
不知道为什么每个人都认为流必须只声明为单个 IntegrationFlow
对象,但没有人阻止你将它分成几个并连接到通道。一个可以以 .channel("foo")
结束,另一个可以以 IntegraitonFlows.from("foo")
开始。虽然看起来您可以省略该通道并将两个流连接成一个。
IntegrationFlow
只是让您最大限度地减少代码数量并真正省略一些样板代码片段。但是如果你的逻辑需要channel,或者有几个phase,分出来享受结果就好了。
所以,你的逻辑是这样的:
- 从队列中读取并存储在数据库中。是一个
IntegrationFlow
- 使用
dbDataMessageSource()
轮询数据库并进行处理。另一个独立的流程。
请阅读有关核心 Spring 集成的更多信息。 Java DSL 只是试图简化配置,但它仍然遵循与 SI Core 相同的 channel -> endpoint -> handler
原则。
好吧,我找到了按照我提到的方式执行此操作的方法。我可以重写 JdbcOutboundGateway,它包括 JdbcMessageHandler 和 JdbcPollingChannelAdapter,并使最后一个可配置,这样我就可以按照我想要的方式设置轮询时间和输出通道。
有点不可靠,但应该可以。
干杯。