如何在 spring 云数据流中将 gemfire 配置为聚合器模块的消息存储

How to configure gemfire as message store for aggregator module in spring cloud data flow

Link - https://github.com/spring-cloud-stream-app-starters/aggregator/tree/master/spring-cloud-starter-stream-processor-aggregator 没有为 gemfire 消息存储

列出 属性

通过 Spring 集成存在多种后端存储选项。您可以在 spring-cloud-starter-stream-processor-aggregator/README 中阅读更多相关信息。

Spring 关于此事的集成文档包含在 link 中,Gemfire section 可能会有用。

查看 MessageGroupStore implementation 也很有用,因为它是 aggregator 中存储选项的基础。

GemfireMessageStore配置如下:

@ConditionalOnClass(GemfireMessageStore.class)
@ConditionalOnProperty(prefix = AggregatorProperties.PREFIX,
        name = "message-store-type",
        havingValue = AggregatorProperties.MessageStoreType.GEMFIRE)
@Import(ClientCacheAutoConfiguration.class)
static class Gemfire {

    @Bean
    @ConditionalOnMissingBean
    public ClientRegionFactoryBean<?, ?> gemfireRegion(GemFireCache cache, AggregatorProperties properties) {
        ClientRegionFactoryBean<?, ?> clientRegionFactoryBean = new ClientRegionFactoryBean<>();
        clientRegionFactoryBean.setCache(cache);
        clientRegionFactoryBean.setName(properties.getMessageStoreEntity());
        return clientRegionFactoryBean;
    }

    @Bean
    public MessageGroupStore messageStore(Region<Object, Object> region) {
        return new GemfireMessageStore(region);
    }

}

重点是您始终可以用自己的覆盖 ClientRegionFactoryBean。 或者您可以考虑 ClientCacheAutoConfiguration 基于 @ClientCacheApplication,这反过来允许您拥有一个 ClientCacheConfigurer bean 并为您的客户端缓存配置提供足够的内容。包括配置和池。没错:它不在应用程序启动器配置级别上,您必须将一些自定义代码作为依赖项包含在最终的 uber jar 中,以用于特定于目标绑定器的应用程序。

有关如何构建它们的更多信息,请参见文档:https://docs.spring.io/spring-cloud-stream-app-starters/docs/Einstein.RC1/reference/htmlsingle/#_patching_pre_built_applications