使用 Spring 批处理集成为 AWS S3 中的每个新文件启动 JobLaunchRequest
Launch JobLaunchRequest for each new file in AWS S3 with Spring Batch Integration
我正在关注文档:Spring Batch Integration combining with the Integration AWS 用于汇集 AWS S3。
但在某些情况下,每个文件的批量执行不起作用。
AWS S3 池工作正常,所以当我放入新文件或启动应用程序并且存储桶中有文件时,应用程序与本地目录同步:
@Bean
public S3SessionFactory s3SessionFactory(AmazonS3 pAmazonS3) {
return new S3SessionFactory(pAmazonS3);
}
@Bean
public S3InboundFileSynchronizer s3InboundFileSynchronizer(S3SessionFactory pS3SessionFactory) {
S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(pS3SessionFactory);
synchronizer.setPreserveTimestamp(true);
synchronizer.setDeleteRemoteFiles(false);
synchronizer.setRemoteDirectory("remote-bucket");
//synchronizer.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "simpleMetadataStore"));
return synchronizer;
}
@Bean
@InboundChannelAdapter(value = IN_CHANNEL_NAME, poller = @Poller(fixedDelay = "30"))
public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource(
S3InboundFileSynchronizer pS3InboundFileSynchronizer) {
S3InboundFileSynchronizingMessageSource messageSource = new S3InboundFileSynchronizingMessageSource(pS3InboundFileSynchronizer);
messageSource.setAutoCreateLocalDirectory(true);
messageSource.setLocalDirectory(new FileSystemResource("files").getFile());
//messageSource.setLocalFilter(new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "fsSimpleMetadataStore"));
return messageSource;
}
@Bean("s3filesChannel")
public PollableChannel s3FilesChannel() {
return new QueueChannel();
}
我按照教程创建了 FileMessageToJobRequest
我不会把代码放在这里,因为它与文档相同
所以我创建了 bean IntegrationFlow 和 FileMessageToJobRequest:
@Bean
public IntegrationFlow integrationFlow(
S3InboundFileSynchronizingMessageSource pS3InboundFileSynchronizingMessageSource) {
return IntegrationFlows.from(pS3InboundFileSynchronizingMessageSource,
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1)))
.transform(fileMessageToJobRequest())
.handle(jobLaunchingGateway())
.log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
.get();
}
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
fileMessageToJobRequest.setJob(delimitedFileJob);
return fileMessageToJobRequest;
}
所以在 JobLaunchingGateway 我认为是问题所在:
如果我这样创建:
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository);
simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
return jobLaunchingGateway;
}
情况一(应用程序启动时Bucket为空):
- 我在 AWS S3 中上传了一个新文件;
- 池工作正常,文件出现在本地目录中;
- 但是 transform/job 没有被触发;
情况2(应用程序启动时Bucket已经有一个文件):
- 作业已启动:
2021-01-12 13:32:34.451 INFO 1955 --- [ask-scheduler-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=arquivoDelimitadoJob]] launched with the following parameters: [{input.file.name=files/FILE1.csv}]
2021-01-12 13:32:34.524 INFO 1955 --- [ask-scheduler-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [delimitedFileJob]
- 如果我在 S3 中添加第二个文件,作业不会像案例 1 那样启动。
情况3(Bucket中有多个文件):
- 文件在本地目录中正确同步
- 但是作业只对最后一个文件执行一次。
所以按照 docs 我将我的网关更改为:
@Bean
@ServiceActivator(inputChannel = IN_CHANNEL_NAME, poller = @Poller(fixedRate="1000"))
public JobLaunchingGateway jobLaunchingGateway() {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository);
simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
//JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
//jobLaunchingGateway.setOutputChannel(replyChannel());
jobLaunchingGateway.setOutputChannel(s3FilesChannel());
return jobLaunchingGateway;
}
使用这个新的网关实现,如果我在 S3 中放入一个新文件,应用程序会做出反应但不会转换并给出错误:
Caused by: java.lang.IllegalArgumentException: The payload must be of type JobLaunchRequest. Object of class [java.io.File] must be an instance of class org.springframework.batch.integration.launch.JobLaunchRequest
并且如果存储桶中有两个文件(当应用程序启动时)FILE1.csv 和 FILE2.csv,作业会正确运行 FILE1.csv,但给出上面的错误FILE2.csv.
实现这样的东西的正确方法是什么?
需要说明的是,我想在此存储桶中接收数千个 csv 文件,使用 Spring 批处理进行读取和处理,但我还需要尽快从 S3 获取每个新文件。
提前致谢。
JobLaunchingGateway
确实只希望我们 JobLaunchRequest
作为有效载荷。
因为你在 S3InboundFileSynchronizingMessageSource
bean 定义上有那个 @InboundChannelAdapter(value = IN_CHANNEL_NAME, poller = @Poller(fixedDelay = "30"))
,所以没有 FileMessageToJobRequest
转换器的那个 JobLaunchingGateway
有 @ServiceActivator(inputChannel = IN_CHANNEL_NAME
真的是错误的介于两者之间。
你的 integrationFlow
对我来说看起来不错,但是你真的需要从 S3InboundFileSynchronizingMessageSource
bean 中删除 @InboundChannelAdapter
并完全依赖 c.poller()
配置。
另一种方法是保留 @InboundChannelAdapter
,然后从 IN_CHANNEL_NAME
而不是 MessageSource
.
开始 IntegrationFlow
由于您有多个针对同一个 S3 源的轮询器,而且它们都基于同一个本地目录,所以看到这么多意外情况并不奇怪。
我正在关注文档:Spring Batch Integration combining with the Integration AWS 用于汇集 AWS S3。
但在某些情况下,每个文件的批量执行不起作用。
AWS S3 池工作正常,所以当我放入新文件或启动应用程序并且存储桶中有文件时,应用程序与本地目录同步:
@Bean
public S3SessionFactory s3SessionFactory(AmazonS3 pAmazonS3) {
return new S3SessionFactory(pAmazonS3);
}
@Bean
public S3InboundFileSynchronizer s3InboundFileSynchronizer(S3SessionFactory pS3SessionFactory) {
S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(pS3SessionFactory);
synchronizer.setPreserveTimestamp(true);
synchronizer.setDeleteRemoteFiles(false);
synchronizer.setRemoteDirectory("remote-bucket");
//synchronizer.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "simpleMetadataStore"));
return synchronizer;
}
@Bean
@InboundChannelAdapter(value = IN_CHANNEL_NAME, poller = @Poller(fixedDelay = "30"))
public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource(
S3InboundFileSynchronizer pS3InboundFileSynchronizer) {
S3InboundFileSynchronizingMessageSource messageSource = new S3InboundFileSynchronizingMessageSource(pS3InboundFileSynchronizer);
messageSource.setAutoCreateLocalDirectory(true);
messageSource.setLocalDirectory(new FileSystemResource("files").getFile());
//messageSource.setLocalFilter(new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "fsSimpleMetadataStore"));
return messageSource;
}
@Bean("s3filesChannel")
public PollableChannel s3FilesChannel() {
return new QueueChannel();
}
我按照教程创建了 FileMessageToJobRequest
我不会把代码放在这里,因为它与文档相同
所以我创建了 bean IntegrationFlow 和 FileMessageToJobRequest:
@Bean
public IntegrationFlow integrationFlow(
S3InboundFileSynchronizingMessageSource pS3InboundFileSynchronizingMessageSource) {
return IntegrationFlows.from(pS3InboundFileSynchronizingMessageSource,
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1)))
.transform(fileMessageToJobRequest())
.handle(jobLaunchingGateway())
.log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
.get();
}
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
fileMessageToJobRequest.setJob(delimitedFileJob);
return fileMessageToJobRequest;
}
所以在 JobLaunchingGateway 我认为是问题所在:
如果我这样创建:
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository);
simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
return jobLaunchingGateway;
}
情况一(应用程序启动时Bucket为空):
- 我在 AWS S3 中上传了一个新文件;
- 池工作正常,文件出现在本地目录中;
- 但是 transform/job 没有被触发;
情况2(应用程序启动时Bucket已经有一个文件):
- 作业已启动:
2021-01-12 13:32:34.451 INFO 1955 --- [ask-scheduler-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=arquivoDelimitadoJob]] launched with the following parameters: [{input.file.name=files/FILE1.csv}]
2021-01-12 13:32:34.524 INFO 1955 --- [ask-scheduler-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [delimitedFileJob]
- 如果我在 S3 中添加第二个文件,作业不会像案例 1 那样启动。
情况3(Bucket中有多个文件):
- 文件在本地目录中正确同步
- 但是作业只对最后一个文件执行一次。
所以按照 docs 我将我的网关更改为:
@Bean
@ServiceActivator(inputChannel = IN_CHANNEL_NAME, poller = @Poller(fixedRate="1000"))
public JobLaunchingGateway jobLaunchingGateway() {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository);
simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
//JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
//jobLaunchingGateway.setOutputChannel(replyChannel());
jobLaunchingGateway.setOutputChannel(s3FilesChannel());
return jobLaunchingGateway;
}
使用这个新的网关实现,如果我在 S3 中放入一个新文件,应用程序会做出反应但不会转换并给出错误:
Caused by: java.lang.IllegalArgumentException: The payload must be of type JobLaunchRequest. Object of class [java.io.File] must be an instance of class org.springframework.batch.integration.launch.JobLaunchRequest
并且如果存储桶中有两个文件(当应用程序启动时)FILE1.csv 和 FILE2.csv,作业会正确运行 FILE1.csv,但给出上面的错误FILE2.csv.
实现这样的东西的正确方法是什么?
需要说明的是,我想在此存储桶中接收数千个 csv 文件,使用 Spring 批处理进行读取和处理,但我还需要尽快从 S3 获取每个新文件。
提前致谢。
JobLaunchingGateway
确实只希望我们 JobLaunchRequest
作为有效载荷。
因为你在 S3InboundFileSynchronizingMessageSource
bean 定义上有那个 @InboundChannelAdapter(value = IN_CHANNEL_NAME, poller = @Poller(fixedDelay = "30"))
,所以没有 FileMessageToJobRequest
转换器的那个 JobLaunchingGateway
有 @ServiceActivator(inputChannel = IN_CHANNEL_NAME
真的是错误的介于两者之间。
你的 integrationFlow
对我来说看起来不错,但是你真的需要从 S3InboundFileSynchronizingMessageSource
bean 中删除 @InboundChannelAdapter
并完全依赖 c.poller()
配置。
另一种方法是保留 @InboundChannelAdapter
,然后从 IN_CHANNEL_NAME
而不是 MessageSource
.
IntegrationFlow
由于您有多个针对同一个 S3 源的轮询器,而且它们都基于同一个本地目录,所以看到这么多意外情况并不奇怪。