如何使用 Spring 批处理集成在多个实例中同时处理多个大文件?
How to processing multiples large files at same time with multiples instances using Spring Batch Integration?
我为处理多个文件创建了一个 Spring 批处理集成项目,它运行得非常好。
虽然我正在写这个问题,但我有 四个 Pods 运行,但行为并不像我期望的那样,我期望同时处理 20 个文件(每个 Pod 五个)。
我的 pooler 设置使用以下参数:
poller-delay: 10000
max-message-per-poll: 5
我还使用 Redis 来存储文件和过滤器:
private CompositeFileListFilter<S3ObjectSummary> s3FileListFilter() {
return new CompositeFileListFilter<S3ObjectSummary>().addFilter(
new S3PersistentAcceptOnceFileListFilter(new RedisMetadataStore(redisConnectionFactory), "prefix-"))
.addFilter(new S3RegexPatternFileListFilter(".*\.csv$"));
}
似乎每个 pod 只处理一个文件,另一个奇怪的行为是 pods 中的一个在 Redis 中注册所有文件,所以其他 Pods 只获取新文件。
同时处理多个文件的最佳做法是什么以及如何解决?
在 S3InboundFileSynchronizingMessageSource
上查看此选项:
/**
* Set the maximum number of objects the source should fetch if it is necessary to
* fetch objects. Setting the
* maxFetchSize to 0 disables remote fetching, a negative value indicates no limit.
* @param maxFetchSize the max fetch size; a negative value means unlimited.
*/
@ManagedAttribute(description = "Maximum objects to fetch")
void setMaxFetchSize(int maxFetchSize);
这是文档:https://docs.spring.io/spring-integration/docs/current/reference/html/ftp.html#ftp-max-fetch
我为处理多个文件创建了一个 Spring 批处理集成项目,它运行得非常好。
虽然我正在写这个问题,但我有 四个 Pods 运行,但行为并不像我期望的那样,我期望同时处理 20 个文件(每个 Pod 五个)。
我的 pooler 设置使用以下参数:
poller-delay: 10000
max-message-per-poll: 5
我还使用 Redis 来存储文件和过滤器:
private CompositeFileListFilter<S3ObjectSummary> s3FileListFilter() {
return new CompositeFileListFilter<S3ObjectSummary>().addFilter(
new S3PersistentAcceptOnceFileListFilter(new RedisMetadataStore(redisConnectionFactory), "prefix-"))
.addFilter(new S3RegexPatternFileListFilter(".*\.csv$"));
}
似乎每个 pod 只处理一个文件,另一个奇怪的行为是 pods 中的一个在 Redis 中注册所有文件,所以其他 Pods 只获取新文件。
同时处理多个文件的最佳做法是什么以及如何解决?
在 S3InboundFileSynchronizingMessageSource
上查看此选项:
/**
* Set the maximum number of objects the source should fetch if it is necessary to
* fetch objects. Setting the
* maxFetchSize to 0 disables remote fetching, a negative value indicates no limit.
* @param maxFetchSize the max fetch size; a negative value means unlimited.
*/
@ManagedAttribute(description = "Maximum objects to fetch")
void setMaxFetchSize(int maxFetchSize);
这是文档:https://docs.spring.io/spring-integration/docs/current/reference/html/ftp.html#ftp-max-fetch