Spring 集成持久消息存储
Spring integration persistent message store
我如何告诉 spring 集成在流中保留消息并在应用程序关闭时恢复?
我有这样的 spring 集成流程:
IntegrationFlows
.from(ftpInboundAdapter) { c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1)) }
.transform<File, JobLaunchRequest> { toAJobRequest(it, aJob) }
.handle(JobLaunchingGateway(jobLauncher))
.transform<JobExecution, JobLaunchRequest> { toBJobRequest(bJob) }
.handle(JobLaunchingGateway(jobLauncher))
.handle { _ -> }
.get()
我尝试添加
@Bean
fun jdbcChannelMessageStore(dataSource: DataSource): JdbcChannelMessageStore? {
val jdbcChannelMessageStore = JdbcChannelMessageStore(dataSource)
jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(H2ChannelMessageStoreQueryProvider())
return jdbcChannelMessageStore
}
没有成功。
你关于 JDBC MessageStore
的想法是正确的,唯一的问题是你没有展示你如何使用那个 jdbcChannelMessageStore
bean。
根据 documentation,您需要 QueueChannel
注入此 MessageStore
,但您的流量完全没有任何渠道自定义。
要在你的流程中建立一个持久的渠道,你需要有这样的东西:
.channel { c -> c.queue(jdbcChannelMessageStore, "persistentGroup") }
(您需要以某种方式在流程中注入一个 jdbcChannelMessageStore
bean)。
这样消息将存储在 INT_CHANNEL_MESSAGE
table 中的数据基础中。他们会在那边的车祸中幸存下来。
您还需要记住 QueueChannel
不可 订阅 ,它必须 轮询 。因此,必须使用 poller
配置流中此通道定义之后的端点。例如:
.channel { c -> c.queue(jdbcChannelMessageStore, "persistentGroup") }
.handle(JobLaunchingGateway(jobLauncher), e -> e.poller(p -> p.fixedDelay(1000)))
所有答案都在文档中:https://docs.spring.io/spring-integration/reference/html/dsl.html#java-dsl-pollers
顺便说一句,我们有一个专门的项目来简化 Kotlin 体验:https://github.com/spring-projects/spring-integration-extensions/tree/master/spring-integration-kotlin-dsl
我如何告诉 spring 集成在流中保留消息并在应用程序关闭时恢复?
我有这样的 spring 集成流程:
IntegrationFlows
.from(ftpInboundAdapter) { c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1)) }
.transform<File, JobLaunchRequest> { toAJobRequest(it, aJob) }
.handle(JobLaunchingGateway(jobLauncher))
.transform<JobExecution, JobLaunchRequest> { toBJobRequest(bJob) }
.handle(JobLaunchingGateway(jobLauncher))
.handle { _ -> }
.get()
我尝试添加
@Bean
fun jdbcChannelMessageStore(dataSource: DataSource): JdbcChannelMessageStore? {
val jdbcChannelMessageStore = JdbcChannelMessageStore(dataSource)
jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(H2ChannelMessageStoreQueryProvider())
return jdbcChannelMessageStore
}
没有成功。
你关于 JDBC MessageStore
的想法是正确的,唯一的问题是你没有展示你如何使用那个 jdbcChannelMessageStore
bean。
根据 documentation,您需要 QueueChannel
注入此 MessageStore
,但您的流量完全没有任何渠道自定义。
要在你的流程中建立一个持久的渠道,你需要有这样的东西:
.channel { c -> c.queue(jdbcChannelMessageStore, "persistentGroup") }
(您需要以某种方式在流程中注入一个 jdbcChannelMessageStore
bean)。
这样消息将存储在 INT_CHANNEL_MESSAGE
table 中的数据基础中。他们会在那边的车祸中幸存下来。
您还需要记住 QueueChannel
不可 订阅 ,它必须 轮询 。因此,必须使用 poller
配置流中此通道定义之后的端点。例如:
.channel { c -> c.queue(jdbcChannelMessageStore, "persistentGroup") }
.handle(JobLaunchingGateway(jobLauncher), e -> e.poller(p -> p.fixedDelay(1000)))
所有答案都在文档中:https://docs.spring.io/spring-integration/reference/html/dsl.html#java-dsl-pollers
顺便说一句,我们有一个专门的项目来简化 Kotlin 体验:https://github.com/spring-projects/spring-integration-extensions/tree/master/spring-integration-kotlin-dsl