Spring 集成持久消息存储

Spring integration persistent message store

我如何告诉 spring 集成在流中保留消息并在应用程序关闭时恢复?

我有这样的 spring 集成流程:

IntegrationFlows
            .from(ftpInboundAdapter) { c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1)) }
            .transform<File, JobLaunchRequest> { toAJobRequest(it, aJob) }
            .handle(JobLaunchingGateway(jobLauncher))
            .transform<JobExecution, JobLaunchRequest> { toBJobRequest(bJob) }
            .handle(JobLaunchingGateway(jobLauncher))
            .handle { _ -> }
            .get()

我尝试添加

    @Bean
    fun jdbcChannelMessageStore(dataSource: DataSource): JdbcChannelMessageStore? {
        val jdbcChannelMessageStore = JdbcChannelMessageStore(dataSource)
        jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(H2ChannelMessageStoreQueryProvider())
        return jdbcChannelMessageStore
    }

没有成功。

你关于 JDBC MessageStore 的想法是正确的,唯一的问题是你没有展示你如何使用那个 jdbcChannelMessageStore bean。

根据 documentation,您需要 QueueChannel 注入此 MessageStore,但您的流量完全没有任何渠道自定义。

要在你的流程中建立一个持久的渠道,你需要有这样的东西:

.channel { c -> c.queue(jdbcChannelMessageStore, "persistentGroup") }

(您需要以某种方式在流程中注入一个 jdbcChannelMessageStore bean)。

这样消息将存储在 INT_CHANNEL_MESSAGE table 中的数据基础中。他们会在那边的车祸中幸存下来。

您还需要记住 QueueChannel 不可 订阅 ,它必须 轮询 。因此,必须使用 poller 配置流中此通道定义之后的端点。例如:

  .channel { c -> c.queue(jdbcChannelMessageStore, "persistentGroup") }
  .handle(JobLaunchingGateway(jobLauncher), e -> e.poller(p -> p.fixedDelay(1000)))

所有答案都在文档中:https://docs.spring.io/spring-integration/reference/html/dsl.html#java-dsl-pollers

顺便说一句,我们有一个专门的项目来简化 Kotlin 体验:https://github.com/spring-projects/spring-integration-extensions/tree/master/spring-integration-kotlin-dsl