Spring 集成来自多个 s3 存储桶的 AWS s3-inbound-streaming-channel-adapter 流

Spring Integration AWS s3-inbound-streaming-channel-adapter stream from multiple s3 buckets

我正在使用基于 XML 的 spring 集成并使用 s3-inbound-streaming-channel-adapter 单个 s3 存储桶进行流式传输。

我们现在需要从 两个 s3 存储桶进行流式传输。

那么 s3-inbound-streaming-channel-adapter 是否可以从 多个 桶中流式传输?

或者我是否需要为每个 s3 存储桶创建一个单独的s3-inbound-streaming-channel-adapter

这是我目前对单个 s3 存储桶的设置,它确实有效。

<int-aws:s3-inbound-streaming-channel-adapter 
channel="s3Channel"
session-factory="s3SessionFactory" 
filter="acceptOnceFilter"
remote-directory-expression="'bucket-1'">
    <int:poller fixed-rate="1000"/>
</int-aws:s3-inbound-streaming-channel-adapter>

提前致谢。

更新:

正如下面 Artem Bilan 所提到的,我最终得到了 两个 s3-inbound-streaming-channel-adapter

但是,对于每个入站适配器,我必须分别声明 acceptOnceFilter 和 metadataStore 的实例

这是因为如果我只有一个 acceptOnceFilter 和 metadataStore 的实例并且它们被两个入站适配器共享,那么一些奇怪的循环就会开始发生。

例如 当 file_1.csv 到达 bucket-1 并得到处理,然后如果你把相同的 file_1.csv 在 bucket-2 然后奇怪的循环开始发生。不知道为什么!所以我最终为每个入站适配器创建了 acceptOnceFilter 和 metadataStore。

`

    <!-- ===================================================== -->
    <!-- Region 1 s3-inbound-streaming-channel-adapter setting -->
    <!-- ===================================================== -->

    <bean id="metadataStore" class="org.springframework.integration.metadata.SimpleMetadataStore"/>

    <bean id="acceptOnceFilter"
          class="org.springframework.integration.aws.support.filters.S3PersistentAcceptOnceFileListFilter">
        <constructor-arg index="0" ref="metadataStore"/>
        <constructor-arg index="1" value="streaming"/>
    </bean>

    <int-aws:s3-inbound-streaming-channel-adapter id="s3Region1"
                                                  channel="s3Channel"
                                                  session-factory="s3SessionFactory"
                                                  filter="acceptOnceFilter"
                                                  remote-directory-expression="'${s3.bucketOne.name}'">
        <int:poller fixed-rate="1000"/>
    </int-aws:s3-inbound-streaming-channel-adapter>

    <int:channel id="s3Channel">
        <int:queue capacity="50"/>
    </int:channel>

    <!-- ===================================================== -->
    <!-- Region 2 s3-inbound-streaming-channel-adapter setting -->
    <!-- ===================================================== -->

    <bean id="metadataStoreRegion2" class="org.springframework.integration.metadata.SimpleMetadataStore"/>

    <bean id="acceptOnceFilterRegion2"
          class="org.springframework.integration.aws.support.filters.S3PersistentAcceptOnceFileListFilter">
        <constructor-arg index="0" ref="metadataStoreRegion2"/>
        <constructor-arg index="1" value="streaming"/>
    </bean>

    <int-aws:s3-inbound-streaming-channel-adapter id="s3Region2"
                                                  channel="s3ChannelRegion2"
                                                  session-factory="s3SessionFactoryRegion2"
                                                  filter="acceptOnceFilterRegion2"
                                                  remote-directory-expression="'${s3.bucketTwo.name}'">
        <int:poller fixed-rate="1000"/>
    </int-aws:s3-inbound-streaming-channel-adapter>

    <int:channel id="s3ChannelRegion2">
        <int:queue capacity="50"/>
    </int:channel>

`

没错,目前的实现只支持单个 remote directory 定期轮询。此时此刻,我们确实正在努力将这样的解决方案形式化为开箱即用的功能。对于 (S)FTP 支持也报告了类似的请求,尤其是在配置过程中事先不知道目标目录的情况下。

如果您为目录的每个通道适配器配置多个通道适配器不是什么大问题,那就太好了。您始终可以将来自它们的消息发送到同一频道进行处理。

否则您可以考虑通过以下方式循环存储桶列表:

  <xsd:attribute name="remote-directory-expression" type="xsd:string">
        <xsd:annotation>
            <xsd:documentation>
                Specify a SpEL expression which will be used to evaluate the directory
                path to where the files will be transferred
                (e.g., "headers.['remote_dir'] + '/myTransfers'" for outbound endpoints)
                There is no root object (message) for inbound endpoints
                (e.g., "@someBean.fetchDirectory");
            </xsd:documentation>
        </xsd:annotation>
    </xsd:attribute>

在一些豆子中。