Poller/Adapter s3-inbound-channel-adapter 无法正常工作

Poller/Adapter in s3-inbound-channel-adapter not working properly

我的方案是,每当文件被修改时,仅将一个文件从 AWS S3 存储桶传输到 EC2 实例一次。我使用以下配置并在服务器启动时手动启动适配器。

问题是服务器启动时重复执行 5 或 6 次。看起来不同的线程正在执行。我能够在日志中看到不同的任务执行器不知道它是轮询器问题还是适配器问题。

我正在使用服务激活器根据 S3 位置中的文件更改执行一些其他操作。

注意:此问题仅在启动时出现一次。进一步文件修改工作正常。

配置:

<bean id="s3SessionFactory" 
            class="org.springframework.integration.aws.support.S3SessionFactory"></bean>
        <bean id="acceptOnceFilter"
            class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
        <task:executor id="s3PollingExecutor" pool-size="1" queue-capacity="10" />
        <integration:channel id="s3FilesChannel"/>
        <int-aws:s3-inbound-channel-adapter id="s3FileInbound"
                     channel="s3FilesChannel" 
                     session-factory="s3SessionFactory" 
                     auto-create-local-directory="false"
                     delete-remote-files="false" 
                     preserve-timestamp="true"
                     filter="acceptOnceFilter"
                     local-directory="local_directory"
                     auto-startup="false" 
                     remote-directory="s3_bucket">
            <integration:poller id="s3FilesChannelPoller" 
                                fixed-rate="1000" 
                                max-messages-per-poll="1" time-unit="MILLISECONDS" 
                                task-executor="s3PollingExecutor">
            </integration:poller>
        </int-aws:s3-inbound-channel-adapter>
        <integration:service-activator id="s3FilesChannelWatcher" 
                                       input-channel="s3FilesChannel" 
                                       output-channel="nullChannel"
                                       ref="configurationFileWatcher" 
                                       method="getConfigurationFileWatcher">
        </integration:service-activator>                                      

按照你的建议,我尝试了以下方法。

<bean id="acceptOnceFilterRegion"
      class="cS3FileFilterOnLastModifiedTime">
    <constructor-arg index="0" ref="metaDataStoreRegion"/>
    <constructor-arg index="1" value="*"/>
</bean>                                                                                                

添加了检查上次修改时间的逻辑

import org.springframework.integration.aws.support.filters.S3PersistentAcceptOnceFileListFilter;
import org.springframework.integration.metadata.ConcurrentMetadataStore;
import com.amazonaws.services.s3.model.S3ObjectSummary;
public class S3FileFilterOnLastModifiedTime extends S3PersistentAcceptOnceFileListFilter {

    Long delayTime = 1000L;

    public S3FileFilterOnLastModifiedTime(ConcurrentMetadataStore store, String prefix) {
        super(store, prefix);
    }

    @Override
    public boolean accept(S3ObjectSummary file) {
        long lastModified = modified(file);
        long currentTime = System.currentTimeMillis();
        long timeDifference = currentTime - lastModified;
        return timeDifference > delayTime;
    }   
} 

还是没有希望.logs是这样的......

[INFO ] 2018-10-11 11:22:10,888 [s3PollingExecutor-1] ConfigurationSettingWatcher {} - ConfigurationSettingWatcher Started succesfully
[INFO ] 2018-10-11 11:22:10,892 [s3PollingExecutor-2] ConfigurationSettingWatcher {} - ConfigurationSettingWatcher Started succesfully
[INFO ] 2018-10-11 11:22:10,892 [s3PollingExecutor-3] ConfigurationSettingWatcher {} - ConfigurationSettingWatcher Started succesfully
[INFO ] 2018-10-11 11:22:10,893 [s3PollingExecutor-4] ConfigurationSettingWatcher {} - ConfigurationSettingWatcher Started succesfully
[INFO ] 2018-10-11 11:22:10,893 [s3PollingExecutor-5] ConfigurationSettingWatcher {} - ConfigurationSettingWatcher Started succesfully
[INFO ] 2018-10-11 11:22:10,894 [s3PollingExecutor-6] ConfigurationSettingWatcher {} - ConfigurationSettingWatcher Started succesfully

我认为您忽略了这样一个事实,即您的 s3_bucket 在启动时不是空的,并且 <int-aws:s3-inbound-channel-adapter> 在启动时会拾取所有文件。这就是您如何看待这几个任务:每个文件的每个任务。

如果你真的只担心那个桶中的新文件,你需要考虑不要使用内存中的 AcceptOnceFileListFilter,而是切换到一些基于共享 MetadataStore 的持久实现执行。为此,在 spring-integration-awsDynamoDbMetadataStore 中有一个 S3PersistentAcceptOnceFileListFilter 以将过滤结果保存到 AWS 上的 DynamoDb 中:https://github.com/spring-projects/spring-integration-aws#metadata-store-for-amazon-dynamodb