"For an upload InputStream with no MD5 digest metadata, the markSupported() method must evaluate to true." 在 Spring 集成 AWS
"For an upload InputStream with no MD5 digest metadata, the markSupported() method must evaluate to true." in Spring Integration AWS
更新:spring-integration-aws-2.3.4
中存在错误
我将 SFTP (SftpStreamingMessageSource) 作为源与 S3 作为目标集成。
我有类似的 Spring 集成配置:
@Bean
public S3MessageHandler.UploadMetadataProvider uploadMetadataProvider() {
return (metadata, message) -> {
if ( message.getPayload() instanceof DigestInputStream) {
metadata.setContentType( MediaType.APPLICATION_JSON_VALUE );
// can not read stream to manually compute MD5
// metadata.setContentMD5("BLABLA==");
// this is wrong approach: metadata.setContentMD5(BinaryUtils.toBase64((((DigestInputStream) message.getPayload()).getMessageDigest().digest()));
}
};
}
@Bean
@InboundChannelAdapter(channel = "ftpStream")
public MessageSource<InputStream> ftpSource(SftpRemoteFileTemplate template) {
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template);
messageSource.setRemoteDirectory("foo");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
messageSource.setLoggingEnabled(true);
messageSource.setCountsEnabled(true);
return messageSource;
}
...
@Bean
@ServiceActivator(inputChannel = "ftpStream")
public MessageHandler s3MessageHandler(AmazonS3 amazonS3, S3MessageHandler.UploadMetadataProvider uploadMetadataProvider) {
S3MessageHandler messageHandler = new S3MessageHandler(amazonS3, "bucketName");
messageHandler.setLoggingEnabled(true);
messageHandler.setCountsEnabled(true);
messageHandler.setCommand(S3MessageHandler.Command.UPLOAD);
messageHandler.setUploadMetadataProvider(uploadMetadataProvider);
messageHandler.setKeyExpression(new ValueExpression<>("key"));
return messageHandler;
}
启动后出现以下错误
"For an upload InputStream with no MD5 digest metadata, the markSupported() method must evaluate to true."
这是因为 ftpSource
在没有 mark/reset 支持的情况下生成 InputStream
有效载荷。我什至尝试使用 @Transformer
将 InputStream 转换为 BufferedInputStream
,例如正在关注
return new BufferedInputStream((InputStream) message.getPayload());
没有成功,因为那时我收到消息“java.io.IOException:流已关闭”,因为 S3MessageHandler:338
正在调用 Md5Utils.md5AsBase64(inputStream)
过早关闭流.
如何在 Spring Integration AWS 中轻松为所有消息生成 MD5?
我正在使用 spring-integration-aws-2.3.4.RELEASE
S3MessageHandler
这样做:
if (payload instanceof InputStream) {
InputStream inputStream = (InputStream) payload;
if (metadata.getContentMD5() == null) {
Assert.state(inputStream.markSupported(),
"For an upload InputStream with no MD5 digest metadata, "
+ "the markSupported() method must evaluate to true.");
String contentMd5 = Md5Utils.md5AsBase64(inputStream);
metadata.setContentMD5(contentMd5);
inputStream.reset();
}
putObjectRequest = new PutObjectRequest(bucketName, key, inputStream, metadata);
}
Md5Utils.md5AsBase64()
最终关闭了一个 InputStream
- 对我们不利。
这是我们这边的疏忽。请提出 GH 问题,我们会尽快解决。或者随时提供贡献。
作为解决方法,我建议在此 S3MessageHandler
之前使用一个转换器,代码如下:
return org.springframework.util.StreamUtils.copyToByteArray(inputStream);
这样你就已经有了一个 byte[]
作为 S3MessageHandler
的有效负载,它将使用不同的分支进行处理:
else if (payload instanceof byte[]) {
byte[] payloadBytes = (byte[]) payload;
InputStream inputStream = new ByteArrayInputStream(payloadBytes);
if (metadata.getContentMD5() == null) {
String contentMd5 = Md5Utils.md5AsBase64(inputStream);
metadata.setContentMD5(contentMd5);
inputStream.reset();
}
if (metadata.getContentLength() == 0) {
metadata.setContentLength(payloadBytes.length);
}
putObjectRequest = new PutObjectRequest(bucketName, key, inputStream, metadata);
}
更新:spring-integration-aws-2.3.4
中存在错误我将 SFTP (SftpStreamingMessageSource) 作为源与 S3 作为目标集成。 我有类似的 Spring 集成配置:
@Bean
public S3MessageHandler.UploadMetadataProvider uploadMetadataProvider() {
return (metadata, message) -> {
if ( message.getPayload() instanceof DigestInputStream) {
metadata.setContentType( MediaType.APPLICATION_JSON_VALUE );
// can not read stream to manually compute MD5
// metadata.setContentMD5("BLABLA==");
// this is wrong approach: metadata.setContentMD5(BinaryUtils.toBase64((((DigestInputStream) message.getPayload()).getMessageDigest().digest()));
}
};
}
@Bean
@InboundChannelAdapter(channel = "ftpStream")
public MessageSource<InputStream> ftpSource(SftpRemoteFileTemplate template) {
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template);
messageSource.setRemoteDirectory("foo");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
messageSource.setLoggingEnabled(true);
messageSource.setCountsEnabled(true);
return messageSource;
}
...
@Bean
@ServiceActivator(inputChannel = "ftpStream")
public MessageHandler s3MessageHandler(AmazonS3 amazonS3, S3MessageHandler.UploadMetadataProvider uploadMetadataProvider) {
S3MessageHandler messageHandler = new S3MessageHandler(amazonS3, "bucketName");
messageHandler.setLoggingEnabled(true);
messageHandler.setCountsEnabled(true);
messageHandler.setCommand(S3MessageHandler.Command.UPLOAD);
messageHandler.setUploadMetadataProvider(uploadMetadataProvider);
messageHandler.setKeyExpression(new ValueExpression<>("key"));
return messageHandler;
}
启动后出现以下错误
"For an upload InputStream with no MD5 digest metadata, the markSupported() method must evaluate to true."
这是因为 ftpSource
在没有 mark/reset 支持的情况下生成 InputStream
有效载荷。我什至尝试使用 @Transformer
将 InputStream 转换为 BufferedInputStream
,例如正在关注
return new BufferedInputStream((InputStream) message.getPayload());
没有成功,因为那时我收到消息“java.io.IOException:流已关闭”,因为 S3MessageHandler:338
正在调用 Md5Utils.md5AsBase64(inputStream)
过早关闭流.
如何在 Spring Integration AWS 中轻松为所有消息生成 MD5?
我正在使用 spring-integration-aws-2.3.4.RELEASE
S3MessageHandler
这样做:
if (payload instanceof InputStream) {
InputStream inputStream = (InputStream) payload;
if (metadata.getContentMD5() == null) {
Assert.state(inputStream.markSupported(),
"For an upload InputStream with no MD5 digest metadata, "
+ "the markSupported() method must evaluate to true.");
String contentMd5 = Md5Utils.md5AsBase64(inputStream);
metadata.setContentMD5(contentMd5);
inputStream.reset();
}
putObjectRequest = new PutObjectRequest(bucketName, key, inputStream, metadata);
}
Md5Utils.md5AsBase64()
最终关闭了一个 InputStream
- 对我们不利。
这是我们这边的疏忽。请提出 GH 问题,我们会尽快解决。或者随时提供贡献。
作为解决方法,我建议在此 S3MessageHandler
之前使用一个转换器,代码如下:
return org.springframework.util.StreamUtils.copyToByteArray(inputStream);
这样你就已经有了一个 byte[]
作为 S3MessageHandler
的有效负载,它将使用不同的分支进行处理:
else if (payload instanceof byte[]) {
byte[] payloadBytes = (byte[]) payload;
InputStream inputStream = new ByteArrayInputStream(payloadBytes);
if (metadata.getContentMD5() == null) {
String contentMd5 = Md5Utils.md5AsBase64(inputStream);
metadata.setContentMD5(contentMd5);
inputStream.reset();
}
if (metadata.getContentLength() == 0) {
metadata.setContentLength(payloadBytes.length);
}
putObjectRequest = new PutObjectRequest(bucketName, key, inputStream, metadata);
}