spring-与 AWS S3 集成重试失败 "get" 文件
spring-integration with AWS S3 retry failed "get" file
我们正在使用 spring-与 S3 的集成。我们有 s3-inbound-streaming-channel-adapter 从 S3 读取。发生的事情是,如果 "get" 失败,s3-inbound-streaming-channel-adapter 将文件名放入 "acceptOnceFilter" 并且不会在失败时重试。
Q1。我们想要的是当 s3-inbound-streaming-channel-adapter "gets" 来自 S3 的文件并且由于某种原因说这个 "get" 失败时......我们如何获得 s3-inbound-streaming-通道适配器重试此 "get" 同一文件的请求?
Q2。失败时,异常会从 s3-inbound-streaming-channel-adapter 发送到默认 "errorChannel"。异常中的消息是否包含失败的 "filename"?
<int:channel id="s3FileProcessingChannel">
<int:queue capacity="15"/>
</int:channel>
<bean id="metadataStore" class="org.springframework.integration.metadata.SimpleMetadataStore"/>
<bean id="acceptOnceFilter"
class="org.springframework.integration.aws.support.filters.S3PersistentAcceptOnceFileListFilter">
<constructor-arg index="0" ref="metadataStore"/>
<constructor-arg index="1" value="streaming"/>
</bean>
<int-aws:s3-inbound-streaming-channel-adapter id="s3Region1"
channel="s3FileProcessingChannel"
session-factory="s3SessionFactory"
filter="acceptOnceFilter"
remotedirectoryexpression="'${s3.sourceBucket}/emm'">
<int:poller fixed-delay="1000" max-messages-per-poll="15"/>
</int-aws:s3-inbound-streaming-channel-adapter>
谢谢
通用
S3PersistentAcceptOnceFileListFilter
实现:
/**
* A {@link FileListFilter} that can be reset by removing a specific file from its
* state.
* @author Gary Russell
* @since 4.1.7
*
*/
public interface ResettableFileListFilter<F> extends FileListFilter<F> {
/**
* Remove the specified file from the filter so it will pass on the next attempt.
* @param f the element to remove.
* @return true if the file was removed as a result of this call.
*/
boolean remove(F f);
}
并且 S3StreamingMessageSource
像这样填充 headers:
return getMessageBuilderFactory()
.withPayload(session.readRaw(remotePath))
.setHeader(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE, session)
.setHeader(FileHeaders.REMOTE_DIRECTORY, file.getRemoteDirectory())
.setHeader(FileHeaders.REMOTE_FILE, file.getFilename())
.setHeader(FileHeaders.REMOTE_FILE_INFO,
this.fileInfoJson ? file.toJson() : file);
当错误发生时,你只需要使用 FileHeaders.REMOTE_FILE
调用上面提到的 remove()
并且你的失败文件将在下一个轮询周期从 S3 中拾取。
我们正在使用 spring-与 S3 的集成。我们有 s3-inbound-streaming-channel-adapter 从 S3 读取。发生的事情是,如果 "get" 失败,s3-inbound-streaming-channel-adapter 将文件名放入 "acceptOnceFilter" 并且不会在失败时重试。
Q1。我们想要的是当 s3-inbound-streaming-channel-adapter "gets" 来自 S3 的文件并且由于某种原因说这个 "get" 失败时......我们如何获得 s3-inbound-streaming-通道适配器重试此 "get" 同一文件的请求?
Q2。失败时,异常会从 s3-inbound-streaming-channel-adapter 发送到默认 "errorChannel"。异常中的消息是否包含失败的 "filename"?
<int:channel id="s3FileProcessingChannel">
<int:queue capacity="15"/>
</int:channel>
<bean id="metadataStore" class="org.springframework.integration.metadata.SimpleMetadataStore"/>
<bean id="acceptOnceFilter"
class="org.springframework.integration.aws.support.filters.S3PersistentAcceptOnceFileListFilter">
<constructor-arg index="0" ref="metadataStore"/>
<constructor-arg index="1" value="streaming"/>
</bean>
<int-aws:s3-inbound-streaming-channel-adapter id="s3Region1"
channel="s3FileProcessingChannel"
session-factory="s3SessionFactory"
filter="acceptOnceFilter"
remotedirectoryexpression="'${s3.sourceBucket}/emm'">
<int:poller fixed-delay="1000" max-messages-per-poll="15"/>
</int-aws:s3-inbound-streaming-channel-adapter>
谢谢 通用
S3PersistentAcceptOnceFileListFilter
实现:
/**
* A {@link FileListFilter} that can be reset by removing a specific file from its
* state.
* @author Gary Russell
* @since 4.1.7
*
*/
public interface ResettableFileListFilter<F> extends FileListFilter<F> {
/**
* Remove the specified file from the filter so it will pass on the next attempt.
* @param f the element to remove.
* @return true if the file was removed as a result of this call.
*/
boolean remove(F f);
}
并且 S3StreamingMessageSource
像这样填充 headers:
return getMessageBuilderFactory()
.withPayload(session.readRaw(remotePath))
.setHeader(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE, session)
.setHeader(FileHeaders.REMOTE_DIRECTORY, file.getRemoteDirectory())
.setHeader(FileHeaders.REMOTE_FILE, file.getFilename())
.setHeader(FileHeaders.REMOTE_FILE_INFO,
this.fileInfoJson ? file.toJson() : file);
当错误发生时,你只需要使用 FileHeaders.REMOTE_FILE
调用上面提到的 remove()
并且你的失败文件将在下一个轮询周期从 S3 中拾取。