使用 rusoto 流式上传到 s3
Streamed upload to s3 with rusoto
如何使用 rusoto 将文件上传到 s3,而不将文件内容读取到内存(流式传输)?
使用此代码:
use std::fs::File;
use std::io::BufReader;
use rusoto_core::Region;
use rusoto_s3::{PutObjectRequest, S3, S3Client, StreamingBody};
fn main() {
let file = File::open("input.txt").unwrap();
let mut reader = BufReader::new(file);
let s3_client = S3Client::new(Region::UsEast1);
let result = s3_client.put_object(PutObjectRequest {
bucket: String::from("example_bucket"),
key: "example_filename".to_string(),
// this works:
// body: Some("example string".to_owned().into_bytes().into()),
// this doesn't:
body: Some(StreamingBody::new(reader)),
..Default::default()
}).sync().expect("could not upload");
}
我收到以下错误:
error[E0277]: the trait bound `std::io::BufReader<std::fs::File>: futures::stream::Stream` is not satisfied
--> src/bin/example.rs:18:20
|
18 | body: Some(StreamingBody::new(reader)),
| ^^^^^^^^^^^^^^^^^^ the trait `futures::stream::Stream` is not implemented for `std::io::BufReader<std::fs::File>`
|
= note: required by `rusoto_core::stream::ByteStream::new`
好的。系好安全带,这很有趣。
StreamingBody
是 ByteStream
的别名,它本身采用参数类型 S: Stream<Item = Bytes, Error = Error> + Send + 'static
。简而言之,它需要是一个字节流。
BufReader
,显然,没有实现这个特性,因为它早于 futures 和 streams 很长一段时间。也没有简单的 Stream<Item = Bytes>
转换,您可以使用它来隐式转换成这个
第一个(注释的)示例起作用的原因是 String::into_bytes().into()
将遵循类型转换链:String
-> Vec<u8>
-> ByteStream
感谢实施From<Vec<u8>>
在 ByteStream
.
现在我们知道为什么这不起作用,我们可以修复它。有快速的方法,然后才有正确的方法。我会告诉你们两个。
快速方法
快速(但不是最优)的方法就是调用 File::read_to_end()
。这将填充一个 Vec<u8>
,然后您可以像以前一样使用它:
let mut buf:Vec<u8> = vec![];
file.read_to_end(&mut buf)?;
// buf now contains the entire file
这是低效且次优的,原因有二:
read_to_end()
是阻塞调用。根据您从何处读取文件,此阻塞时间可能被证明是不合理的
- 您需要拥有比文件中字节数更多的可用 RAM(+
Vec
定义的 64 位或 128 位 + 我们并不真正关心的一些额外内容)
好方法
将您的文件转换为实现 AsyncRead
的结构的好方法。由此,我们可以形成一个Stream
.
由于您已经有了 std::fs::File
,我们将首先将其转换为 tokio::fs::File
。这个实现了AsyncRead
,对后面很重要:
let tokio_file = tokio::fs::File::from_std(file);
据此,我们遗憾地需要做一些管道工作才能将其放入 Stream
。多个板条箱已经实现了它;从头开始的方法如下:
use tokio_util::codec;
let byte_stream = codec::FramedRead::new(tokio_file, codec::BytesCodec::new())
.map(|r| r.as_ref().to_vec());
byte_stream
是 tokio_util::codec::FramedRead
的实例 implements Stream
with a specific item based on our decoder. As our decoder is BytesCodec
,因此您的流是 Stream<Item = BytesMut>
.
由于 playground 不知道 rusoto_core
,我无法向您展示完整的流程。但是,我可以向您展示您可以生成一个 Stream<Item = Vec<u8>, Error = io::Error>
,这是关键所在:https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=38e4ae8be0d70abd134b5331d6bf4133
这是一个具有即将推出的 Rusoto 异步等待语法的版本(对于 getObject 虽然应该很容易调整上传)...可能会在 Rusoto 0.4.3 中用于 public 消费:
https://github.com/brainstorm/rusoto-s3-async-await
即:
pub async fn bucket_obj_bytes(client: S3Client, bucket: String, _prefix: String, object: String) {
let get_req = GetObjectRequest {
bucket,
key: object,
..Default::default()
};
let result = client
.get_object(get_req)
.await
.expect("Couldn't GET object");
println!("get object result: {:#?}", result);
let stream = result.body.unwrap();
let body = stream.map_ok(|b| BytesMut::from(&b[..])).try_concat().await.unwrap();
assert!(body.len() > 0);
dbg!(body);
}
本质上是借用了integration testsuite itself, where you can find snippets of the upload version too。
如何使用 rusoto 将文件上传到 s3,而不将文件内容读取到内存(流式传输)?
使用此代码:
use std::fs::File;
use std::io::BufReader;
use rusoto_core::Region;
use rusoto_s3::{PutObjectRequest, S3, S3Client, StreamingBody};
fn main() {
let file = File::open("input.txt").unwrap();
let mut reader = BufReader::new(file);
let s3_client = S3Client::new(Region::UsEast1);
let result = s3_client.put_object(PutObjectRequest {
bucket: String::from("example_bucket"),
key: "example_filename".to_string(),
// this works:
// body: Some("example string".to_owned().into_bytes().into()),
// this doesn't:
body: Some(StreamingBody::new(reader)),
..Default::default()
}).sync().expect("could not upload");
}
我收到以下错误:
error[E0277]: the trait bound `std::io::BufReader<std::fs::File>: futures::stream::Stream` is not satisfied --> src/bin/example.rs:18:20 | 18 | body: Some(StreamingBody::new(reader)), | ^^^^^^^^^^^^^^^^^^ the trait `futures::stream::Stream` is not implemented for `std::io::BufReader<std::fs::File>` | = note: required by `rusoto_core::stream::ByteStream::new`
好的。系好安全带,这很有趣。
StreamingBody
是 ByteStream
的别名,它本身采用参数类型 S: Stream<Item = Bytes, Error = Error> + Send + 'static
。简而言之,它需要是一个字节流。
BufReader
,显然,没有实现这个特性,因为它早于 futures 和 streams 很长一段时间。也没有简单的 Stream<Item = Bytes>
转换,您可以使用它来隐式转换成这个
第一个(注释的)示例起作用的原因是 String::into_bytes().into()
将遵循类型转换链:String
-> Vec<u8>
-> ByteStream
感谢实施From<Vec<u8>>
在 ByteStream
.
现在我们知道为什么这不起作用,我们可以修复它。有快速的方法,然后才有正确的方法。我会告诉你们两个。
快速方法
快速(但不是最优)的方法就是调用 File::read_to_end()
。这将填充一个 Vec<u8>
,然后您可以像以前一样使用它:
let mut buf:Vec<u8> = vec![];
file.read_to_end(&mut buf)?;
// buf now contains the entire file
这是低效且次优的,原因有二:
read_to_end()
是阻塞调用。根据您从何处读取文件,此阻塞时间可能被证明是不合理的- 您需要拥有比文件中字节数更多的可用 RAM(+
Vec
定义的 64 位或 128 位 + 我们并不真正关心的一些额外内容)
好方法
将您的文件转换为实现 AsyncRead
的结构的好方法。由此,我们可以形成一个Stream
.
由于您已经有了 std::fs::File
,我们将首先将其转换为 tokio::fs::File
。这个实现了AsyncRead
,对后面很重要:
let tokio_file = tokio::fs::File::from_std(file);
据此,我们遗憾地需要做一些管道工作才能将其放入 Stream
。多个板条箱已经实现了它;从头开始的方法如下:
use tokio_util::codec;
let byte_stream = codec::FramedRead::new(tokio_file, codec::BytesCodec::new())
.map(|r| r.as_ref().to_vec());
byte_stream
是 tokio_util::codec::FramedRead
的实例 implements Stream
with a specific item based on our decoder. As our decoder is BytesCodec
,因此您的流是 Stream<Item = BytesMut>
.
由于 playground 不知道 rusoto_core
,我无法向您展示完整的流程。但是,我可以向您展示您可以生成一个 Stream<Item = Vec<u8>, Error = io::Error>
,这是关键所在:https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=38e4ae8be0d70abd134b5331d6bf4133
这是一个具有即将推出的 Rusoto 异步等待语法的版本(对于 getObject 虽然应该很容易调整上传)...可能会在 Rusoto 0.4.3 中用于 public 消费:
https://github.com/brainstorm/rusoto-s3-async-await
即:
pub async fn bucket_obj_bytes(client: S3Client, bucket: String, _prefix: String, object: String) {
let get_req = GetObjectRequest {
bucket,
key: object,
..Default::default()
};
let result = client
.get_object(get_req)
.await
.expect("Couldn't GET object");
println!("get object result: {:#?}", result);
let stream = result.body.unwrap();
let body = stream.map_ok(|b| BytesMut::from(&b[..])).try_concat().await.unwrap();
assert!(body.len() > 0);
dbg!(body);
}
本质上是借用了integration testsuite itself, where you can find snippets of the upload version too。