spring-integration-aws S3StreamingMessageSource 如何删除远程文件?
spring-integration-aws S3StreamingMessageSource how do I delete the remotefile?
我目前使用来自 spring 集成 aws 的 S3StreamingMessageSource。我将流传递给集成流。
public MessageSource<InputStream> s3InboundStreamingMessageSource() {
S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template());
messageSource.setRemoteDirectory(bucketName);
messageSource.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(),
"streaming"));
return messageSource;
}
@Bean
public IntegrationFlow s3IntegrationFlow() {
return IntegrationFlows.from(s3InboundStreamingMessageSource(), spec -> spec.poller(Pollers.fixedDelay(10, TimeUnit.SECONDS)))
.transform(new S3ObjectInputStreamToStringTransformer())
.transform(Transformers.toJson())
.handle(Http.outboundGateway("http://localhost:8099/create").httpMethod(HttpMethod.POST).extractPayload(true))
.channel("nullChannel")
.get();
}
如何从 S3 中删除检索到的远程文件?
在 S3InboundFileSynchronizer 中有一个方法。
像这样:
@Bean
public S3InboundFileSynchronizer s3InboundFileSynchronizer() {
S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(factory);
synchronizer.setDeleteRemoteFiles(true);
synchronizer.setPreserveTimestamp(true);
synchronizer.setRemoteDirectory(bucketName);
return synchronizer;
}
任何人都可以帮助我或告诉我一个好的解决方法吗?
我们的流媒体通道适配器没有远程文件的本地副本,因此我们无法猜测您将如何处理远程文件的 InputStream
。所以,这就是为什么 S3StreamingMessageSource
.
上没有 setDeleteRemoteFiles()
的原因
我看到你在做这样的事情S3ObjectInputStreamToStringTransformer
。请告诉我,这个定制变压器的原因是什么。已经有一个 StreamTransformer
和它的 charset
选项,远程文件的 InputStream
将被转换为字符串:
/**
* Construct an instance with the charset to convert the stream to a
* String; if null a {@code byte[]} will be produced instead.
* @param charset the charset.
*/
public StreamTransformer(String charset) {
此外:需要记住 StaticMessageHeaderAccessor.getCloseableResource(message)
必须在读取 InputStream
后关闭以避免资源泄漏。
您应该考虑使用 handle()
来调用 AmazonS3.deleteObject(String bucketName, String key)
API,而不是 .channel("nullChannel")
。 bucketName
分别存储在FileHeaders.REMOTE_DIRECTORY
和key
中FileHeaders.REMOTE_FILE
headers。
我目前使用来自 spring 集成 aws 的 S3StreamingMessageSource。我将流传递给集成流。
public MessageSource<InputStream> s3InboundStreamingMessageSource() {
S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template());
messageSource.setRemoteDirectory(bucketName);
messageSource.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(),
"streaming"));
return messageSource;
}
@Bean
public IntegrationFlow s3IntegrationFlow() {
return IntegrationFlows.from(s3InboundStreamingMessageSource(), spec -> spec.poller(Pollers.fixedDelay(10, TimeUnit.SECONDS)))
.transform(new S3ObjectInputStreamToStringTransformer())
.transform(Transformers.toJson())
.handle(Http.outboundGateway("http://localhost:8099/create").httpMethod(HttpMethod.POST).extractPayload(true))
.channel("nullChannel")
.get();
}
如何从 S3 中删除检索到的远程文件?
在 S3InboundFileSynchronizer 中有一个方法。
像这样:
@Bean
public S3InboundFileSynchronizer s3InboundFileSynchronizer() {
S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(factory);
synchronizer.setDeleteRemoteFiles(true);
synchronizer.setPreserveTimestamp(true);
synchronizer.setRemoteDirectory(bucketName);
return synchronizer;
}
任何人都可以帮助我或告诉我一个好的解决方法吗?
我们的流媒体通道适配器没有远程文件的本地副本,因此我们无法猜测您将如何处理远程文件的 InputStream
。所以,这就是为什么 S3StreamingMessageSource
.
setDeleteRemoteFiles()
的原因
我看到你在做这样的事情S3ObjectInputStreamToStringTransformer
。请告诉我,这个定制变压器的原因是什么。已经有一个 StreamTransformer
和它的 charset
选项,远程文件的 InputStream
将被转换为字符串:
/**
* Construct an instance with the charset to convert the stream to a
* String; if null a {@code byte[]} will be produced instead.
* @param charset the charset.
*/
public StreamTransformer(String charset) {
此外:需要记住 StaticMessageHeaderAccessor.getCloseableResource(message)
必须在读取 InputStream
后关闭以避免资源泄漏。
您应该考虑使用 handle()
来调用 AmazonS3.deleteObject(String bucketName, String key)
API,而不是 .channel("nullChannel")
。 bucketName
分别存储在FileHeaders.REMOTE_DIRECTORY
和key
中FileHeaders.REMOTE_FILE
headers。