Spring 集成:入站文件适配器在服务重新启动时丢弃文件

Spring Integration: Inbound File Adapter drops files when service restarts

我们正在使用 Spring 集成的 S3InboundFileSynchronizingMessageSource 功能在本地同步,然后针对从 S3 存储桶中检索到的任何文件发送消息。

在同步之前,我们应用了几个 S3PersistentAcceptOnceFileListFilter 过滤器(检查文件的 TimeModified 和 Hash/ETag)以确保我们只同步“新”文件。

注意:我们使用 JdbcMetadataStore table 来保存之前通过过滤器的文件记录(对每个过滤器使用不同的 REGION)。

最后,对于 S3InboundFileSynchronizingMessageSource 本地过滤器,我们有一个 S3PersistentAcceptOnceFileListFilter FileSystemPersistentAcceptOnceFileListFilter -- 再次在 TimeModified 上再次持续存在,但在不同的地区。

问题是:如果服务在 之后重新启动,文件已通过第一个过滤器,但 消息源成功通过发送消息后,我们实际上 丢弃 文件,但从未实际处理它。

我们做错了什么?我们如何避免这种“文件丢失”问题?

我假设您对 localFilter 使用 FileSystemPersistentAcceptOnceFileListFilter,因为 S3PersistentAcceptOnceFileListFilter 不会在那里工作。

让我们看看您如何在配置中使用这些过滤器!我想知道为您的远程文件切换到 ChainFileListFilter 是否对您有所帮助。

查看文档:https://docs.spring.io/spring-integration/docs/current/reference/html/file.html#file-reading

编辑

if the service is restarted after the file has made it through the 1st filter but before the message source successfully sent the message along

我认为 Gary 是对的:您需要一个围绕该轮询操作的事务,其中还包括过滤器逻辑。

查看文档:https://docs.spring.io/spring-integration/docs/current/reference/html/jdbc.html#jdbc-metadata-store

这样,在文件的消息离开轮询通道适配器之前,不会提交 TX。因此,重新启动后,您将能够再次同步回滚的文件。只是因为它们不存在于商店中进行过滤。