Rusoto 使用 sigv4 流式上传

Rusoto streamed upload using sigv4

我在流式传输上传到 S3 时遇到问题:

// rust version 1.42.0
// OS macos
// [dependencies]
// rusoto_core = "0.43.0"
// rusoto_s3 = "0.43.0"
// log = "0.4"
// pretty_env_logger = "0.4.0"
// tokio = "0.2.14"
// tokio-util = { version = "0.3.1", features = ["codec"] }
// futures = "0.3.4"
// bytes = "0.5.4"

#![allow(dead_code)]
#[allow(unused_imports)]

use log::{debug,info,warn,error};
use bytes::Bytes;
use tokio_util::codec;
use futures::stream::{Stream, TryStreamExt};
use rusoto_core::Region;
use rusoto_s3::{PutObjectRequest, S3, S3Client};
use tokio::io::{AsyncRead, Result};

#[tokio::main]
async fn main() {
    pretty_env_logger::init();
    let pathbuf = std::path::PathBuf::from(String::from("/tmp/test.bin"));
    copy_to_s3(&pathbuf).await;
}

async fn copy_to_s3(pathbuf: &std::path::PathBuf) {
    debug!("copy_to_s3: {:?}", pathbuf);

    let tokio_file = tokio::fs::File::open(pathbuf.as_path()).await;

    let filename = pathbuf.file_name().unwrap().to_str().unwrap();
    debug!("filename: {:?}", filename);
    let s3_client = S3Client::new(Region::EuWest2);
    let result = s3_client.put_object(PutObjectRequest {
        bucket: String::from("..."),
        key: filename.to_owned(),
        server_side_encryption: Some("AES256".to_string()),

        body: Some(rusoto_core::ByteStream::new(into_bytes_stream(tokio_file.unwrap()))),
        ..Default::default()
    }).await;

    match result {
        Ok(success) => { 
            debug!("Success: {:?}", success);
        },
        Err(error) => {
            error!("Failure: {:?}", error);
        }
    }

    debug!("DONE: copy_to_s3: {:?}", pathbuf);
}

fn into_bytes_stream<R>(r: R) -> impl Stream<Item=Result<Bytes>>
where
    R: AsyncRead,
{
    codec::FramedRead::new(r, codec::BytesCodec::new())
        .map_ok(|bytes| bytes.freeze())
}

我使用 dd if=/dev/zero of=/tmp/test.bin bs=4k count=500.

生成二进制文件

尽管我还没有完全考虑未来的事情,但我只是想把文件转储到 S3 中,尽可能减少内存使用量。

在 运行,我通过调试日志记录得到以下输出;省略了潜在的敏感信息:

$ RUST_LOG=debug cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.36s
     Running `target/debug/uploader`
 DEBUG uploader > copy_to_s3: "/tmp/test.bin"
 DEBUG uploader > filename: "test.bin"
 DEBUG rusoto_core::request > Full request:
 method: PUT
 final_uri: https://s3.eu-west-2.amazonaws.com/.../test.bin
Headers:

 DEBUG rusoto_core::request > authorization:"AWS4-HMAC-SHA256 Credential=.../20200408/eu-west-2/s3/aws4_request, SignedHeaders=content-type;host;x-amz-content-sha256;x-amz-date;x-amz-security-token;x-amz-server-side-encryption, Signature=..."
 DEBUG rusoto_core::request > content-type:"application/octet-stream"
 DEBUG rusoto_core::request > host:"s3.eu-west-2.amazonaws.com"
 DEBUG rusoto_core::request > x-amz-content-sha256:"UNSIGNED-PAYLOAD"
 DEBUG rusoto_core::request > x-amz-date:"20200408T173930Z"
 DEBUG rusoto_core::request > x-amz-security-token:"..."
 DEBUG rusoto_core::request > x-amz-server-side-encryption:"AES256"
 DEBUG rusoto_core::request > user-agent:"rusoto/0.43.0 rust/1.42.0 macos"
 DEBUG hyper::client::connect::dns > resolving host="s3.eu-west-2.amazonaws.com"
 DEBUG hyper::client::connect::http > connecting to 52.95.148.48:443
 DEBUG hyper::client::connect::http > connected to 52.95.148.48:443
 DEBUG hyper::proto::h1::io         > flushed 1070 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 147600 bytes
 DEBUG hyper::proto::h1::io         > flushed 418200 bytes
 DEBUG hyper::proto::h1::io         > flushed 418200 bytes
 DEBUG hyper::proto::h1::io         > flushed 418200 bytes
 DEBUG hyper::proto::h1::io         > flushed 418200 bytes
 DEBUG hyper::proto::h1::io         > flushed 16405 bytes
 DEBUG hyper::proto::h1::io         > read 291 bytes
 DEBUG hyper::proto::h1::io         > parsed 7 headers
 DEBUG hyper::proto::h1::conn       > incoming body is chunked encoding
 DEBUG hyper::proto::h1::io         > read 345 bytes
 DEBUG hyper::proto::h1::decode     > incoming chunked header: 0x14D (333 bytes)
 DEBUG hyper::proto::h1::conn       > incoming body completed
 DEBUG rusoto_core::proto::xml::error > Ignoring unknown XML element "Header" in error response.
 DEBUG rusoto_core::proto::xml::error > Ignoring unknown XML element "RequestId" in error response.
 DEBUG rusoto_core::proto::xml::error > Ignoring unknown XML element "HostId" in error response.
 ERROR uploader                       > Failure: Unknown(BufferedHttpResponse {status: 501, body: "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<Error><Code>NotImplemented</Code><Message>A header you provided implies functionality that is not implemented</Message><Header>Transfer-Encoding</Header><RequestId>3F1A03D67D81CCAB</RequestId><HostId>...=</HostId></Error>", headers: {"x-amz-request-id": "3F1A03D67D81CCAB", "x-amz-id-2": "...", "content-type": "application/xml", "transfer-encoding": "chunked", "date": "Wed, 08 Apr 2020 17:39:30 GMT", "connection": "close", "server": "AmazonS3"} })
 DEBUG uploader                       > DONE: copy_to_s3: "/tmp/test.bin"

我认为这告诉我这不是 sigv4 签名上传,但我不确定。

在大多数情况下,调试输出看起来成功地以块的形式发送文件,但随后出错...

鉴于我假设它发送的是 sigv2 而不是 sigv4,我该如何让它发送 sigv4 headers?如果做不到这一点,我错过了什么?

应指定内容长度。

更改部分

let result = s3_client.put_object(PutObjectRequest {
        bucket: String::from("..."),
        key: filename.to_owned(),
        server_side_encryption: Some("AES256".to_string()),
        // Based on dd if=/dev/zero of=/tmp/test.bin bs=4k count=500
        content_length: Some(2_048_000),
        body: Some(rusoto_core::ByteStream::new(into_bytes_stream(tokio_file.unwrap()))),
        ..Default::default()
    }).await;

固定示例全文

// rust version 1.42.0
// OS macos
// [dependencies]
// rusoto_core = "0.43.0"
// rusoto_s3 = "0.43.0"
// log = "0.4"
// pretty_env_logger = "0.4.0"
// tokio = "0.2.14"
// tokio-util = { version = "0.3.1", features = ["codec"] }
// futures = "0.3.4"
// bytes = "0.5.4"

#![allow(dead_code)]
#[allow(unused_imports)]

use log::{debug,info,warn,error};
use bytes::Bytes;
use tokio_util::codec;
use futures::stream::{Stream, TryStreamExt};
use rusoto_core::Region;
use rusoto_s3::{PutObjectRequest, S3, S3Client};
use tokio::io::{AsyncRead, Result};

#[tokio::main]
async fn main() {
    pretty_env_logger::init();
    let pathbuf = std::path::PathBuf::from(String::from("/tmp/test.bin"));
    copy_to_s3(&pathbuf).await;
}

async fn copy_to_s3(pathbuf: &std::path::PathBuf) {
    debug!("copy_to_s3: {:?}", pathbuf);

    let tokio_file = tokio::fs::File::open(pathbuf.as_path()).await;

    let filename = pathbuf.file_name().unwrap().to_str().unwrap();
    debug!("filename: {:?}", filename);
    let s3_client = S3Client::new(Region::EuWest2);
    let result = s3_client.put_object(PutObjectRequest {
        bucket: String::from("..."),
        key: filename.to_owned(),
        server_side_encryption: Some("AES256".to_string()),
        // Based on dd if=/dev/zero of=/tmp/test.bin bs=4k count=500
        content_length: Some(2_048_000),
        body: Some(rusoto_core::ByteStream::new(into_bytes_stream(tokio_file.unwrap()))),
        ..Default::default()
    }).await;

    match result {
        Ok(success) => { 
            debug!("Success: {:?}", success);
        },
        Err(error) => {
            error!("Failure: {:?}", error);
        }
    }

    debug!("DONE: copy_to_s3: {:?}", pathbuf);
}

fn into_bytes_stream<R>(r: R) -> impl Stream<Item=Result<Bytes>>
where
    R: AsyncRead,
{
    codec::FramedRead::new(r, codec::BytesCodec::new())
        .map_ok(|bytes| bytes.freeze())
}

代码按预期工作,但是,您必须事先知道文件的长度。