spring-integration-aws S3StreamingMessageSource 如何删除远程文件?

spring-integration-aws S3StreamingMessageSource how do I delete the remotefile?

我目前使用来自 spring 集成 aws 的 S3StreamingMessageSource。我将流传递给集成流。

public MessageSource<InputStream> s3InboundStreamingMessageSource() {
    S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template());
    messageSource.setRemoteDirectory(bucketName);
    messageSource.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(),
            "streaming"));
    return messageSource;
}

@Bean
public IntegrationFlow s3IntegrationFlow() {
    return IntegrationFlows.from(s3InboundStreamingMessageSource(), spec -> spec.poller(Pollers.fixedDelay(10, TimeUnit.SECONDS)))
            .transform(new S3ObjectInputStreamToStringTransformer())
            .transform(Transformers.toJson())
            .handle(Http.outboundGateway("http://localhost:8099/create").httpMethod(HttpMethod.POST).extractPayload(true))
            .channel("nullChannel")
            .get();
}

如何从 S3 中删除检索到的远程文件?

在 S3InboundFileSynchronizer 中有一个方法。

像这样:

    @Bean
public S3InboundFileSynchronizer s3InboundFileSynchronizer() {
    S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(factory);
    synchronizer.setDeleteRemoteFiles(true);
    synchronizer.setPreserveTimestamp(true);
    synchronizer.setRemoteDirectory(bucketName);
    return synchronizer;
}

任何人都可以帮助我或告诉我一个好的解决方法吗?

我们的流媒体通道适配器没有远程文件的本地副本,因此我们无法猜测您将如何处理远程文件的 InputStream。所以,这就是为什么 S3StreamingMessageSource.

上没有 setDeleteRemoteFiles() 的原因

我看到你在做这样的事情S3ObjectInputStreamToStringTransformer。请告诉我,这个定制变压器的原因是什么。已经有一个 StreamTransformer 和它的 charset 选项,远程文件的 InputStream 将被转换为字符串:

/**
 * Construct an instance with the charset to convert the stream to a
 * String; if null a {@code byte[]} will be produced instead.
 * @param charset the charset.
 */
public StreamTransformer(String charset) {

此外:需要记住 StaticMessageHeaderAccessor.getCloseableResource(message) 必须在读取 InputStream 后关闭以避免资源泄漏。

您应该考虑使用 handle() 来调用 AmazonS3.deleteObject(String bucketName, String key) API,而不是 .channel("nullChannel")bucketName分别存储在FileHeaders.REMOTE_DIRECTORYkeyFileHeaders.REMOTE_FILEheaders。