使用 Spring Aws 集成从 S3 对象流式传输对象

Streaming objects from S3 Object using Spring Aws Integration

我正在处理一个用例,我应该在其中轮询 S3 -> 读取流中的内容 -> 进行一些处理并将其上传到另一个存储桶,而不是将文件写入我的服务器。

我知道我可以在 Spring aws 集成中使用 S3StreamingMessageSource 来实现它,但我面临的问题是我不知道如何处理通过轮询收到的消息流

public class S3PollerConfigurationUsingStreaming {
    @Value("${amazonProperties.bucketName}")
    private String bucketName;

    @Value("${amazonProperties.newBucket}")
    private String newBucket;

    @Autowired
    private AmazonClientService amazonClient;

    @Bean
    @InboundChannelAdapter(value = "s3Channel", poller = @Poller(fixedDelay = "100"))
    public MessageSource<InputStream> s3InboundStreamingMessageSource() {    
        S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template());
        messageSource.setRemoteDirectory(bucketName);
        messageSource.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(),
                "streaming"));      
        return messageSource;
    }

    @Bean
    @Transformer(inputChannel = "s3Channel", outputChannel = "data")
    public org.springframework.integration.transformer.Transformer transformer() {
        return new StreamTransformer();
    }

    @Bean
    public S3RemoteFileTemplate template() {
        return new S3RemoteFileTemplate(new S3SessionFactory(amazonClient.getS3Client()));
    }

    @Bean
    public PollableChannel s3Channel() {
        return new QueueChannel();
    }

    @Bean
    IntegrationFlow fileStreamingFlow() {
        return IntegrationFlows
                .from(s3InboundStreamingMessageSource(),
                        e -> e.poller(p -> p.fixedDelay(30, TimeUnit.SECONDS)))
                .handle(streamFile())
                .get();
    }

}

有人可以帮我处理流的代码吗?

不确定您的问题是什么,但我看到您有多种顾虑。如果您使用消息注释(请参阅配置中的 @InboundChannelAdapter),在 IntegrationFlow 定义中使用相同的 s3InboundStreamingMessageSource 有什么意义?

总之看来您已经为自己探索了一个 StreamTransformer。这个有一个 charset 属性 将您的 InputStream 从远程 S3 资源转换为 String。否则就returns一个byte[]。其他一切都取决于您如何处理此转换后的内容。

此外,我认为没有理由将 s3Channel 作为 QueueChannel,因为无论如何 @InboundChannelAdapter.

都可以启动您的流程

从高处来看,我想说我们有更多问题要问你,反之亦然...

更新

不清楚您对 InputStream 处理的想法是什么,但这确实是事实,在 S3StreamingMessageSource 之后,您将在下一个处理程序中准确地将 InputStream 作为有效负载.

也不确定您的 streamFile() 是什么,但它必须真正期望 InputStream 作为来自请求消息负载的输入。 你也可以使用上面提到的StreamTransformer

@Bean
IntegrationFlow fileStreamingFlow() {
    return IntegrationFlows
            .from(s3InboundStreamingMessageSource(),
                    e -> e.poller(p -> p.fixedDelay(30, TimeUnit.SECONDS)))
            .transform(Transformers.fromStream("UTF-8"))
            .get();
}

下一个 .handle() 将作为有效载荷准备好 String