抛出异常后如何处理SFTP入站适配器轮询中的剩余文件?
How to process remaining files in SFTP inbound adapter poll after an exception is thrown?
我已经创建了一个集成流程来从 SFTP 服务器读取文件并处理它们。我意识到,一旦其中一个文件出现错误(抛出异常),轮询就会停止,并且在下一次轮询之前不会处理任何其他文件。如何避免这种情况,不将文件标记为已处理,并处理该轮询中的剩余文件?
我的配置很简单。我正在使用每分钟触发一次的非事务性轮询器,max-message-per-poll
为 1000。SftpStreamingInboundChannelAdapterSpec
的 max-fetch-size
为 10,并使用复合文件列表过滤器和 SftpRegexPatternFileListFilter
和 SftpPersistentAcceptOnceFileListFilter
.
@Bean
public IntegrationFlow sftpInboundFlow(JdbcMetadataStore jdbcMetadataStore, DataSourceTransactionManager dataSourceTransactionManager) {
return IntegrationFlows.from(sftpStreamingInboundChannelAdapterSpec(jdbcMetadataStore), sourcePollingChannelAdapterSpec -> configureEndpoint(sourcePollingChannelAdapterSpec, dataSourceTransactionManager))
.transform(new StreamTransformer())
.channel("processingChannel")
.get();
}
private SftpStreamingInboundChannelAdapterSpec sftpStreamingInboundChannelAdapterSpec(JdbcMetadataStore jdbcMetadataStore) {
SftpStreamingInboundChannelAdapterSpec sftpStreamingInboundChannelAdapterSpec = Sftp.inboundStreamingAdapter(documentEnrollementSftpRemoteFileTemplate())
.filter(fileListFilter(jdbcMetadataStore))
.maxFetchSize(10)
.remoteDirectory("/the-directory");
SftpStreamingMessageSource sftpStreamingMessageSource = sftpStreamingInboundChannelAdapterSpec.get();
sftpStreamingMessageSource.setFileInfoJson(false);
return sftpStreamingInboundChannelAdapterSpec;
}
private void configureEndpoint(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec, DataSourceTransactionManager dataSourceTransactionManager) {
PollerSpec pollerSpec = Pollers.cron(sftpProperties.getPollCronExpression())
.maxMessagesPerPoll(1000);
sourcePollingChannelAdapterSpec.autoStartup(true)
.poller(pollerSpec);
}
@Bean
public CompositeFileListFilter<ChannelSftp.LsEntry> fileListFilter(JdbcMetadataStore jdbcMetadataStore) {
String fileNameRegex = // get regex
SftpRegexPatternFileListFilter sftpRegexPatternFileListFilter = new SftpRegexPatternFileListFilter(fileNameRegex);
SftpPersistentAcceptOnceFileListFilter sftpPersistentAcceptOnceFileListFilter = new SftpPersistentAcceptOnceFileListFilter(jdbcMetadataStore, "");
CompositeFileListFilter<ChannelSftp.LsEntry> compositeFileListFilter = new CompositeFileListFilter<>();
compositeFileListFilter.addFilter(sftpRegexPatternFileListFilter);
compositeFileListFilter.addFilter(sftpPersistentAcceptOnceFileListFilter);
return compositeFileListFilter;
}
阅读 this answer 后,我尝试使用事务轮询器如下:
PollerSpec pollerSpec = Pollers.cron(sftpProperties.getPollCronExpression())
.maxMessagesPerPoll(1000)
.transactional(dataSourceTransactionManager);
但结果是处理一个文件失败后,轮询停止,所有处理过的消息回滚,剩下的消息直到下一次轮询才处理。我从那个答案中了解到,每条消息都将在单独的事务中处理。
到目前为止我发现实现此目的的唯一方法是将处理代码包围在一个 try/catch 块中以捕获所有异常以避免中断轮询。在 catch 块中,我从复合文件列表过滤器中手动删除了 ChannelSftp.LsEntry
。为此,我需要在 SftpStreamingInboundChannelAdapterSpec
.
提供的 SftpStreamingMessageSource
中将 属性 fileInfoJson
设置为 false
我发现这种方法相当复杂,缺点是失败并从过滤器中删除的文件之后会立即重新处理,而不是在下面 poll.I 希望有一个更直接的解决方案来解决我的问题.
使用 try...catch 的解决方案是可行的方法。这确实是从进程中抛出的异常被冒泡到轮询器中并停止当前循环的事实 maxMessagesPerPoll
:
private Runnable createPoller() {
return () ->
this.taskExecutor.execute(() -> {
int count = 0;
while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
if (pollForMessage() == null) {
break;
}
count++;
}
});
}
那个pollForMessage()
是这样的:
private Message<?> pollForMessage() {
try {
return this.pollingTask.call();
}
catch (Exception e) {
if (e instanceof MessagingException) {
throw (MessagingException) e;
}
else {
Message<?> failedMessage = null;
if (this.transactionSynchronizationFactory != null) {
Object resource = TransactionSynchronizationManager.getResource(getResourceToBind());
if (resource instanceof IntegrationResourceHolder) {
failedMessage = ((IntegrationResourceHolder) resource).getMessage();
}
}
throw new MessagingException(failedMessage, e); // NOSONAR (null failedMessage)
}
}
finally {
if (this.transactionSynchronizationFactory != null) {
Object resource = getResourceToBind();
if (TransactionSynchronizationManager.hasResource(resource)) {
TransactionSynchronizationManager.unbindResource(resource);
}
}
}
}
无论如何,仍然有一种方法可以在单个轮询周期中将一条消息与其他消息隔离开来。为此,您需要查看 Request Handler Advice Chain 并研究 ExpressionEvaluatingRequestHandlerAdvice
的解决方案:https://docs.spring.io/spring-integration/docs/current/reference/html/#message-handler-advice-chain
因此,您将其添加到下游的处理程序端点并在那里捕获异常并执行一些特定的错误处理而不是轮询器的 re-throwing 异常。
我已经创建了一个集成流程来从 SFTP 服务器读取文件并处理它们。我意识到,一旦其中一个文件出现错误(抛出异常),轮询就会停止,并且在下一次轮询之前不会处理任何其他文件。如何避免这种情况,不将文件标记为已处理,并处理该轮询中的剩余文件?
我的配置很简单。我正在使用每分钟触发一次的非事务性轮询器,max-message-per-poll
为 1000。SftpStreamingInboundChannelAdapterSpec
的 max-fetch-size
为 10,并使用复合文件列表过滤器和 SftpRegexPatternFileListFilter
和 SftpPersistentAcceptOnceFileListFilter
.
@Bean
public IntegrationFlow sftpInboundFlow(JdbcMetadataStore jdbcMetadataStore, DataSourceTransactionManager dataSourceTransactionManager) {
return IntegrationFlows.from(sftpStreamingInboundChannelAdapterSpec(jdbcMetadataStore), sourcePollingChannelAdapterSpec -> configureEndpoint(sourcePollingChannelAdapterSpec, dataSourceTransactionManager))
.transform(new StreamTransformer())
.channel("processingChannel")
.get();
}
private SftpStreamingInboundChannelAdapterSpec sftpStreamingInboundChannelAdapterSpec(JdbcMetadataStore jdbcMetadataStore) {
SftpStreamingInboundChannelAdapterSpec sftpStreamingInboundChannelAdapterSpec = Sftp.inboundStreamingAdapter(documentEnrollementSftpRemoteFileTemplate())
.filter(fileListFilter(jdbcMetadataStore))
.maxFetchSize(10)
.remoteDirectory("/the-directory");
SftpStreamingMessageSource sftpStreamingMessageSource = sftpStreamingInboundChannelAdapterSpec.get();
sftpStreamingMessageSource.setFileInfoJson(false);
return sftpStreamingInboundChannelAdapterSpec;
}
private void configureEndpoint(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec, DataSourceTransactionManager dataSourceTransactionManager) {
PollerSpec pollerSpec = Pollers.cron(sftpProperties.getPollCronExpression())
.maxMessagesPerPoll(1000);
sourcePollingChannelAdapterSpec.autoStartup(true)
.poller(pollerSpec);
}
@Bean
public CompositeFileListFilter<ChannelSftp.LsEntry> fileListFilter(JdbcMetadataStore jdbcMetadataStore) {
String fileNameRegex = // get regex
SftpRegexPatternFileListFilter sftpRegexPatternFileListFilter = new SftpRegexPatternFileListFilter(fileNameRegex);
SftpPersistentAcceptOnceFileListFilter sftpPersistentAcceptOnceFileListFilter = new SftpPersistentAcceptOnceFileListFilter(jdbcMetadataStore, "");
CompositeFileListFilter<ChannelSftp.LsEntry> compositeFileListFilter = new CompositeFileListFilter<>();
compositeFileListFilter.addFilter(sftpRegexPatternFileListFilter);
compositeFileListFilter.addFilter(sftpPersistentAcceptOnceFileListFilter);
return compositeFileListFilter;
}
阅读 this answer 后,我尝试使用事务轮询器如下:
PollerSpec pollerSpec = Pollers.cron(sftpProperties.getPollCronExpression())
.maxMessagesPerPoll(1000)
.transactional(dataSourceTransactionManager);
但结果是处理一个文件失败后,轮询停止,所有处理过的消息回滚,剩下的消息直到下一次轮询才处理。我从那个答案中了解到,每条消息都将在单独的事务中处理。
到目前为止我发现实现此目的的唯一方法是将处理代码包围在一个 try/catch 块中以捕获所有异常以避免中断轮询。在 catch 块中,我从复合文件列表过滤器中手动删除了 ChannelSftp.LsEntry
。为此,我需要在 SftpStreamingInboundChannelAdapterSpec
.
SftpStreamingMessageSource
中将 属性 fileInfoJson
设置为 false
我发现这种方法相当复杂,缺点是失败并从过滤器中删除的文件之后会立即重新处理,而不是在下面 poll.I 希望有一个更直接的解决方案来解决我的问题.
使用 try...catch 的解决方案是可行的方法。这确实是从进程中抛出的异常被冒泡到轮询器中并停止当前循环的事实 maxMessagesPerPoll
:
private Runnable createPoller() {
return () ->
this.taskExecutor.execute(() -> {
int count = 0;
while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
if (pollForMessage() == null) {
break;
}
count++;
}
});
}
那个pollForMessage()
是这样的:
private Message<?> pollForMessage() {
try {
return this.pollingTask.call();
}
catch (Exception e) {
if (e instanceof MessagingException) {
throw (MessagingException) e;
}
else {
Message<?> failedMessage = null;
if (this.transactionSynchronizationFactory != null) {
Object resource = TransactionSynchronizationManager.getResource(getResourceToBind());
if (resource instanceof IntegrationResourceHolder) {
failedMessage = ((IntegrationResourceHolder) resource).getMessage();
}
}
throw new MessagingException(failedMessage, e); // NOSONAR (null failedMessage)
}
}
finally {
if (this.transactionSynchronizationFactory != null) {
Object resource = getResourceToBind();
if (TransactionSynchronizationManager.hasResource(resource)) {
TransactionSynchronizationManager.unbindResource(resource);
}
}
}
}
无论如何,仍然有一种方法可以在单个轮询周期中将一条消息与其他消息隔离开来。为此,您需要查看 Request Handler Advice Chain 并研究 ExpressionEvaluatingRequestHandlerAdvice
的解决方案:https://docs.spring.io/spring-integration/docs/current/reference/html/#message-handler-advice-chain
因此,您将其添加到下游的处理程序端点并在那里捕获异常并执行一些特定的错误处理而不是轮询器的 re-throwing 异常。