使用 Spring Aws 集成从 S3 对象流式传输对象
Streaming objects from S3 Object using Spring Aws Integration
我正在处理一个用例,我应该在其中轮询 S3 -> 读取流中的内容 -> 进行一些处理并将其上传到另一个存储桶,而不是将文件写入我的服务器。
我知道我可以在 Spring aws 集成中使用 S3StreamingMessageSource 来实现它,但我面临的问题是我不知道如何处理通过轮询收到的消息流
public class S3PollerConfigurationUsingStreaming {
@Value("${amazonProperties.bucketName}")
private String bucketName;
@Value("${amazonProperties.newBucket}")
private String newBucket;
@Autowired
private AmazonClientService amazonClient;
@Bean
@InboundChannelAdapter(value = "s3Channel", poller = @Poller(fixedDelay = "100"))
public MessageSource<InputStream> s3InboundStreamingMessageSource() {
S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template());
messageSource.setRemoteDirectory(bucketName);
messageSource.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(),
"streaming"));
return messageSource;
}
@Bean
@Transformer(inputChannel = "s3Channel", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer();
}
@Bean
public S3RemoteFileTemplate template() {
return new S3RemoteFileTemplate(new S3SessionFactory(amazonClient.getS3Client()));
}
@Bean
public PollableChannel s3Channel() {
return new QueueChannel();
}
@Bean
IntegrationFlow fileStreamingFlow() {
return IntegrationFlows
.from(s3InboundStreamingMessageSource(),
e -> e.poller(p -> p.fixedDelay(30, TimeUnit.SECONDS)))
.handle(streamFile())
.get();
}
}
有人可以帮我处理流的代码吗?
不确定您的问题是什么,但我看到您有多种顾虑。如果您使用消息注释(请参阅配置中的 @InboundChannelAdapter
),在 IntegrationFlow
定义中使用相同的 s3InboundStreamingMessageSource
有什么意义?
总之看来您已经为自己探索了一个 StreamTransformer
。这个有一个 charset
属性 将您的 InputStream
从远程 S3 资源转换为 String
。否则就returns一个byte[]
。其他一切都取决于您如何处理此转换后的内容。
此外,我认为没有理由将 s3Channel
作为 QueueChannel
,因为无论如何 @InboundChannelAdapter
.
都可以启动您的流程
从高处来看,我想说我们有更多问题要问你,反之亦然...
更新
不清楚您对 InputStream
处理的想法是什么,但这确实是事实,在 S3StreamingMessageSource
之后,您将在下一个处理程序中准确地将 InputStream
作为有效负载.
也不确定您的 streamFile()
是什么,但它必须真正期望 InputStream
作为来自请求消息负载的输入。
你也可以使用上面提到的StreamTransformer
:
@Bean
IntegrationFlow fileStreamingFlow() {
return IntegrationFlows
.from(s3InboundStreamingMessageSource(),
e -> e.poller(p -> p.fixedDelay(30, TimeUnit.SECONDS)))
.transform(Transformers.fromStream("UTF-8"))
.get();
}
下一个 .handle()
将作为有效载荷准备好 String
。
我正在处理一个用例,我应该在其中轮询 S3 -> 读取流中的内容 -> 进行一些处理并将其上传到另一个存储桶,而不是将文件写入我的服务器。
我知道我可以在 Spring aws 集成中使用 S3StreamingMessageSource 来实现它,但我面临的问题是我不知道如何处理通过轮询收到的消息流
public class S3PollerConfigurationUsingStreaming {
@Value("${amazonProperties.bucketName}")
private String bucketName;
@Value("${amazonProperties.newBucket}")
private String newBucket;
@Autowired
private AmazonClientService amazonClient;
@Bean
@InboundChannelAdapter(value = "s3Channel", poller = @Poller(fixedDelay = "100"))
public MessageSource<InputStream> s3InboundStreamingMessageSource() {
S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template());
messageSource.setRemoteDirectory(bucketName);
messageSource.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(),
"streaming"));
return messageSource;
}
@Bean
@Transformer(inputChannel = "s3Channel", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer();
}
@Bean
public S3RemoteFileTemplate template() {
return new S3RemoteFileTemplate(new S3SessionFactory(amazonClient.getS3Client()));
}
@Bean
public PollableChannel s3Channel() {
return new QueueChannel();
}
@Bean
IntegrationFlow fileStreamingFlow() {
return IntegrationFlows
.from(s3InboundStreamingMessageSource(),
e -> e.poller(p -> p.fixedDelay(30, TimeUnit.SECONDS)))
.handle(streamFile())
.get();
}
}
有人可以帮我处理流的代码吗?
不确定您的问题是什么,但我看到您有多种顾虑。如果您使用消息注释(请参阅配置中的 @InboundChannelAdapter
),在 IntegrationFlow
定义中使用相同的 s3InboundStreamingMessageSource
有什么意义?
总之看来您已经为自己探索了一个 StreamTransformer
。这个有一个 charset
属性 将您的 InputStream
从远程 S3 资源转换为 String
。否则就returns一个byte[]
。其他一切都取决于您如何处理此转换后的内容。
此外,我认为没有理由将 s3Channel
作为 QueueChannel
,因为无论如何 @InboundChannelAdapter
.
从高处来看,我想说我们有更多问题要问你,反之亦然...
更新
不清楚您对 InputStream
处理的想法是什么,但这确实是事实,在 S3StreamingMessageSource
之后,您将在下一个处理程序中准确地将 InputStream
作为有效负载.
也不确定您的 streamFile()
是什么,但它必须真正期望 InputStream
作为来自请求消息负载的输入。
你也可以使用上面提到的StreamTransformer
:
@Bean
IntegrationFlow fileStreamingFlow() {
return IntegrationFlows
.from(s3InboundStreamingMessageSource(),
e -> e.poller(p -> p.fixedDelay(30, TimeUnit.SECONDS)))
.transform(Transformers.fromStream("UTF-8"))
.get();
}
下一个 .handle()
将作为有效载荷准备好 String
。