使用 rusoto 的 rust AWS 分段上传,多线程(人造丝)在“没有反应堆 运行 ...”时惊慌失措
rust AWS multipart upload using rusoto, multithreaded (rayon) panicked at 'there is no reactor running ...`
我正在尝试将文件上传到 rust
中的 aws
,为此我正在使用 rusoto_s3
, I managed to get the multipart upload code working when these parts are sent from a single thread, however, that is not what I want, I want to upload big files and I want to be able to send these parts in multiple threads, for that, I did a little bit of googling and I came across rayon 的 s3 Rust 客户端。
有关分段上传工作方式的信息如下:
- 启动多部分 -> aws 将 return 一个 ID
- 使用此 ID 发送不同的部分,传递文件块,部分编号 -> aws 将 return 一个
Etag
- 发送完所有部分后,发送一个完整的上传请求,将所有已完成的部分作为包含
Etag
和部分编号的数组发送。
我是 Rust 新手,来自 C++ 和 Java 背景,这是我的代码:
#[tokio::test]
async fn if_multipart_then_upload_multiparts_dicom() {
let now = Instant::now();
dotenv().ok();
let local_filename = "./files/test_big.DCM";
let destination_filename = "24_time_test.dcm";
let mut file = std::fs::File::open(local_filename).unwrap();
const CHUNK_SIZE: usize = 7_000_000;
let mut buffer = Vec::with_capacity(CHUNK_SIZE);
let client = super::get_client().await;
let create_multipart_request = CreateMultipartUploadRequest {
bucket: client.bucket_name.to_owned(),
key: destination_filename.to_owned(),
..Default::default()
};
// Start the multipart upload and note the upload_id generated
let response = client
.s3
.create_multipart_upload(create_multipart_request)
.await
.expect("Couldn't create multipart upload");
let upload_id = response.upload_id.unwrap();
// Create upload parts
let create_upload_part = |body: Vec<u8>, part_number: i64| -> UploadPartRequest {
UploadPartRequest {
body: Some(body.into()),
bucket: client.bucket_name.to_owned(),
key: destination_filename.to_owned(),
upload_id: upload_id.to_owned(),
part_number: part_number,
..Default::default()
}
};
let completed_parts = Arc::new(Mutex::new(vec![]));
rayon::scope(|scope| {
let mut part_number = 1;
loop {
let maximum_bytes_to_read = CHUNK_SIZE - buffer.len();
println!("maximum_bytes_to_read: {}", maximum_bytes_to_read);
file.by_ref()
.take(maximum_bytes_to_read as u64)
.read_to_end(&mut buffer)
.unwrap();
println!("length: {}", buffer.len());
println!("part_number: {}", part_number);
if buffer.len() == 0 {
// The file has ended.
break;
}
let next_buffer = Vec::with_capacity(CHUNK_SIZE);
let data_to_send = buffer;
let completed_parts_cloned = completed_parts.clone();
scope.spawn(move |_| {
let part = create_upload_part(data_to_send.to_vec(), part_number);
{
let part_number = part.part_number;
let client = executor::block_on(super::get_client());
let response = executor::block_on(client.s3.upload_part(part));
completed_parts_cloned.lock().unwrap().push(CompletedPart {
e_tag: response
.expect("Couldn't complete multipart upload")
.e_tag
.clone(),
part_number: Some(part_number),
});
}
});
buffer = next_buffer;
part_number = part_number + 1;
}
});
let completed_upload = CompletedMultipartUpload {
parts: Some(completed_parts.lock().unwrap().to_vec()),
};
let complete_req = CompleteMultipartUploadRequest {
bucket: client.bucket_name.to_owned(),
key: destination_filename.to_owned(),
upload_id: upload_id.to_owned(),
multipart_upload: Some(completed_upload),
..Default::default()
};
client
.s3
.complete_multipart_upload(complete_req)
.await
.expect("Couldn't complete multipart upload");
println!(
"time taken: {}, with chunk:: {}",
now.elapsed().as_secs(),
CHUNK_SIZE
);
}
这是我得到的输出和错误示例:
maximum_bytes_to_read: 7000000
length: 7000000
part_number: 1
maximum_bytes_to_read: 7000000
length: 7000000
part_number: 2
maximum_bytes_to_read: 7000000
thread '<unnamed>' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', C:\Users\DNDT\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\blocking\pool.rs:85:33
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread '<unnamed>' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', C:\Users\DNDT\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\blocking\pool.rs:85:33
length: 7000000
我用谷歌搜索了这个错误,但我没有清楚地了解它到底是什么:
there is no reactor running, must be called from the context of Tokio runtime”
这是我发现的:
这似乎是一些兼容性问题,因为 s3 可能正在使用与我拥有的 tokio 版本不兼容的某些 tokio
版本。
这里有一些相关的依赖:
tokio = { version = "1", features = ["full"] }
tokio-compat-02 = "0.1.2"
rusoto_s3 = "0.46.0"
rusoto_core = "0.46.0"
rusoto_credential = "0.46.0"
rayon = "1.5.0"
我认为主要问题实际上是想在 rayon
线程中 运行 async
代码。我尝试使用 executor::block_on
将我的 async
代码更改为阻塞代码,我还花了一些时间让编译器满意,我有多个线程,它们都想写入 let completed_parts = Arc::new(Mutex::new(vec![]));
所以我做到了在这里进行一些克隆以使编译器满意。
此外,如果我用过的 craes 很重要,这里是它们:
extern crate dotenv;
extern crate tokio;
use bytes::Bytes;
use dotenv::dotenv;
use futures::executor;
use futures::*;
use rusoto_core::credential::{EnvironmentProvider, ProvideAwsCredentials};
use rusoto_s3::util::{PreSignedRequest, PreSignedRequestOption};
use rusoto_s3::PutObjectRequest;
use rusoto_s3::StreamingBody;
use rusoto_s3::{
CompleteMultipartUploadRequest, CompletedMultipartUpload, CompletedPart,
CreateMultipartUploadRequest, UploadPartRequest, S3,
};
use std::io::Read;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::Instant;
use tokio::fs;
生锈的新手,所以有很多移动的部分来使这个正确!
感谢@Jmb 的讨论,我摆脱了线程,我 spawn
一个 tokio
任务如下:
创建一个向量来持有或期货,这样我们就可以等待它们:
let mut multiple_parts_futures = Vec::new();
生成 async
任务:
loop { // loop file chuncks
...
let send_part_task_future = tokio::task::spawn(async move {
// Upload part
...
}
然后等待所有期货:
let _results = futures::future::join_all(multiple_parts_futures).await;
值得一提的是,完成的部分需要排序:
let mut completed_parts_vector = completed_parts.lock().unwrap().to_vec();
completed_parts_vector.sort_by_key(|part| part.part_number);
整个代码为:
#[tokio::test]
async fn if_multipart_then_upload_multiparts_dicom() {
let now = Instant::now();
dotenv().ok();
let local_filename = "./files/test_big.DCM";
let destination_filename = generate_unique_name();
let destination_filename_clone = destination_filename.clone();
let mut file = std::fs::File::open(local_filename).unwrap();
const CHUNK_SIZE: usize = 6_000_000;
let mut buffer = Vec::with_capacity(CHUNK_SIZE);
let client = super::get_client().await;
let create_multipart_request = CreateMultipartUploadRequest {
bucket: client.bucket_name.to_owned(),
key: destination_filename.to_owned(),
..Default::default()
};
// Start the multipart upload and note the upload_id generated
let response = client
.s3
.create_multipart_upload(create_multipart_request)
.await
.expect("Couldn't create multipart upload");
let upload_id = response.upload_id.unwrap();
let upload_id_clone = upload_id.clone();
// Create upload parts
let create_upload_part = move |body: Vec<u8>, part_number: i64| -> UploadPartRequest {
UploadPartRequest {
body: Some(body.into()),
bucket: client.bucket_name.to_owned(),
key: destination_filename_clone.to_owned(),
upload_id: upload_id_clone.to_owned(),
part_number: part_number,
..Default::default()
}
};
let create_upload_part_arc = Arc::new(create_upload_part);
let completed_parts = Arc::new(Mutex::new(vec![]));
let mut part_number = 1;
let mut multiple_parts_futures = Vec::new();
loop {
let maximum_bytes_to_read = CHUNK_SIZE - buffer.len();
println!("maximum_bytes_to_read: {}", maximum_bytes_to_read);
file.by_ref()
.take(maximum_bytes_to_read as u64)
.read_to_end(&mut buffer)
.unwrap();
println!("length: {}", buffer.len());
println!("part_number: {}", part_number);
if buffer.len() == 0 {
// The file has ended.
break;
}
let next_buffer = Vec::with_capacity(CHUNK_SIZE);
let data_to_send = buffer;
let completed_parts_cloned = completed_parts.clone();
let create_upload_part_arc_cloned = create_upload_part_arc.clone();
let send_part_task_future = tokio::task::spawn(async move {
let part = create_upload_part_arc_cloned(data_to_send.to_vec(), part_number);
{
let part_number = part.part_number;
let client = super::get_client().await;
let response = client.s3.upload_part(part).await;
completed_parts_cloned.lock().unwrap().push(CompletedPart {
e_tag: response
.expect("Couldn't complete multipart upload")
.e_tag
.clone(),
part_number: Some(part_number),
});
}
});
multiple_parts_futures.push(send_part_task_future);
buffer = next_buffer;
part_number = part_number + 1;
}
let client = super::get_client().await;
println!("waiting for futures");
let _results = futures::future::join_all(multiple_parts_futures).await;
let mut completed_parts_vector = completed_parts.lock().unwrap().to_vec();
completed_parts_vector.sort_by_key(|part| part.part_number);
println!("futures done");
let completed_upload = CompletedMultipartUpload {
parts: Some(completed_parts_vector),
};
let complete_req = CompleteMultipartUploadRequest {
bucket: client.bucket_name.to_owned(),
key: destination_filename.to_owned(),
upload_id: upload_id.to_owned(),
multipart_upload: Some(completed_upload),
..Default::default()
};
client
.s3
.complete_multipart_upload(complete_req)
.await
.expect("Couldn't complete multipart upload");
println!(
"time taken: {}, with chunk:: {}",
now.elapsed().as_secs(),
CHUNK_SIZE
);
}
我正在尝试将文件上传到 rust
中的 aws
,为此我正在使用 rusoto_s3
, I managed to get the multipart upload code working when these parts are sent from a single thread, however, that is not what I want, I want to upload big files and I want to be able to send these parts in multiple threads, for that, I did a little bit of googling and I came across rayon 的 s3 Rust 客户端。
有关分段上传工作方式的信息如下:
- 启动多部分 -> aws 将 return 一个 ID
- 使用此 ID 发送不同的部分,传递文件块,部分编号 -> aws 将 return 一个
Etag
- 发送完所有部分后,发送一个完整的上传请求,将所有已完成的部分作为包含
Etag
和部分编号的数组发送。
我是 Rust 新手,来自 C++ 和 Java 背景,这是我的代码:
#[tokio::test]
async fn if_multipart_then_upload_multiparts_dicom() {
let now = Instant::now();
dotenv().ok();
let local_filename = "./files/test_big.DCM";
let destination_filename = "24_time_test.dcm";
let mut file = std::fs::File::open(local_filename).unwrap();
const CHUNK_SIZE: usize = 7_000_000;
let mut buffer = Vec::with_capacity(CHUNK_SIZE);
let client = super::get_client().await;
let create_multipart_request = CreateMultipartUploadRequest {
bucket: client.bucket_name.to_owned(),
key: destination_filename.to_owned(),
..Default::default()
};
// Start the multipart upload and note the upload_id generated
let response = client
.s3
.create_multipart_upload(create_multipart_request)
.await
.expect("Couldn't create multipart upload");
let upload_id = response.upload_id.unwrap();
// Create upload parts
let create_upload_part = |body: Vec<u8>, part_number: i64| -> UploadPartRequest {
UploadPartRequest {
body: Some(body.into()),
bucket: client.bucket_name.to_owned(),
key: destination_filename.to_owned(),
upload_id: upload_id.to_owned(),
part_number: part_number,
..Default::default()
}
};
let completed_parts = Arc::new(Mutex::new(vec![]));
rayon::scope(|scope| {
let mut part_number = 1;
loop {
let maximum_bytes_to_read = CHUNK_SIZE - buffer.len();
println!("maximum_bytes_to_read: {}", maximum_bytes_to_read);
file.by_ref()
.take(maximum_bytes_to_read as u64)
.read_to_end(&mut buffer)
.unwrap();
println!("length: {}", buffer.len());
println!("part_number: {}", part_number);
if buffer.len() == 0 {
// The file has ended.
break;
}
let next_buffer = Vec::with_capacity(CHUNK_SIZE);
let data_to_send = buffer;
let completed_parts_cloned = completed_parts.clone();
scope.spawn(move |_| {
let part = create_upload_part(data_to_send.to_vec(), part_number);
{
let part_number = part.part_number;
let client = executor::block_on(super::get_client());
let response = executor::block_on(client.s3.upload_part(part));
completed_parts_cloned.lock().unwrap().push(CompletedPart {
e_tag: response
.expect("Couldn't complete multipart upload")
.e_tag
.clone(),
part_number: Some(part_number),
});
}
});
buffer = next_buffer;
part_number = part_number + 1;
}
});
let completed_upload = CompletedMultipartUpload {
parts: Some(completed_parts.lock().unwrap().to_vec()),
};
let complete_req = CompleteMultipartUploadRequest {
bucket: client.bucket_name.to_owned(),
key: destination_filename.to_owned(),
upload_id: upload_id.to_owned(),
multipart_upload: Some(completed_upload),
..Default::default()
};
client
.s3
.complete_multipart_upload(complete_req)
.await
.expect("Couldn't complete multipart upload");
println!(
"time taken: {}, with chunk:: {}",
now.elapsed().as_secs(),
CHUNK_SIZE
);
}
这是我得到的输出和错误示例:
maximum_bytes_to_read: 7000000
length: 7000000
part_number: 1
maximum_bytes_to_read: 7000000
length: 7000000
part_number: 2
maximum_bytes_to_read: 7000000
thread '<unnamed>' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', C:\Users\DNDT\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\blocking\pool.rs:85:33
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread '<unnamed>' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', C:\Users\DNDT\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\blocking\pool.rs:85:33
length: 7000000
我用谷歌搜索了这个错误,但我没有清楚地了解它到底是什么:
there is no reactor running, must be called from the context of Tokio runtime”
这是我发现的:
这似乎是一些兼容性问题,因为 s3 可能正在使用与我拥有的 tokio 版本不兼容的某些 tokio
版本。
这里有一些相关的依赖:
tokio = { version = "1", features = ["full"] }
tokio-compat-02 = "0.1.2"
rusoto_s3 = "0.46.0"
rusoto_core = "0.46.0"
rusoto_credential = "0.46.0"
rayon = "1.5.0"
我认为主要问题实际上是想在 rayon
线程中 运行 async
代码。我尝试使用 executor::block_on
将我的 async
代码更改为阻塞代码,我还花了一些时间让编译器满意,我有多个线程,它们都想写入 let completed_parts = Arc::new(Mutex::new(vec![]));
所以我做到了在这里进行一些克隆以使编译器满意。
此外,如果我用过的 craes 很重要,这里是它们:
extern crate dotenv;
extern crate tokio;
use bytes::Bytes;
use dotenv::dotenv;
use futures::executor;
use futures::*;
use rusoto_core::credential::{EnvironmentProvider, ProvideAwsCredentials};
use rusoto_s3::util::{PreSignedRequest, PreSignedRequestOption};
use rusoto_s3::PutObjectRequest;
use rusoto_s3::StreamingBody;
use rusoto_s3::{
CompleteMultipartUploadRequest, CompletedMultipartUpload, CompletedPart,
CreateMultipartUploadRequest, UploadPartRequest, S3,
};
use std::io::Read;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::Instant;
use tokio::fs;
生锈的新手,所以有很多移动的部分来使这个正确!
感谢@Jmb 的讨论,我摆脱了线程,我 spawn
一个 tokio
任务如下:
创建一个向量来持有或期货,这样我们就可以等待它们:
let mut multiple_parts_futures = Vec::new();
生成 async
任务:
loop { // loop file chuncks
...
let send_part_task_future = tokio::task::spawn(async move {
// Upload part
...
}
然后等待所有期货:
let _results = futures::future::join_all(multiple_parts_futures).await;
值得一提的是,完成的部分需要排序:
let mut completed_parts_vector = completed_parts.lock().unwrap().to_vec();
completed_parts_vector.sort_by_key(|part| part.part_number);
整个代码为:
#[tokio::test]
async fn if_multipart_then_upload_multiparts_dicom() {
let now = Instant::now();
dotenv().ok();
let local_filename = "./files/test_big.DCM";
let destination_filename = generate_unique_name();
let destination_filename_clone = destination_filename.clone();
let mut file = std::fs::File::open(local_filename).unwrap();
const CHUNK_SIZE: usize = 6_000_000;
let mut buffer = Vec::with_capacity(CHUNK_SIZE);
let client = super::get_client().await;
let create_multipart_request = CreateMultipartUploadRequest {
bucket: client.bucket_name.to_owned(),
key: destination_filename.to_owned(),
..Default::default()
};
// Start the multipart upload and note the upload_id generated
let response = client
.s3
.create_multipart_upload(create_multipart_request)
.await
.expect("Couldn't create multipart upload");
let upload_id = response.upload_id.unwrap();
let upload_id_clone = upload_id.clone();
// Create upload parts
let create_upload_part = move |body: Vec<u8>, part_number: i64| -> UploadPartRequest {
UploadPartRequest {
body: Some(body.into()),
bucket: client.bucket_name.to_owned(),
key: destination_filename_clone.to_owned(),
upload_id: upload_id_clone.to_owned(),
part_number: part_number,
..Default::default()
}
};
let create_upload_part_arc = Arc::new(create_upload_part);
let completed_parts = Arc::new(Mutex::new(vec![]));
let mut part_number = 1;
let mut multiple_parts_futures = Vec::new();
loop {
let maximum_bytes_to_read = CHUNK_SIZE - buffer.len();
println!("maximum_bytes_to_read: {}", maximum_bytes_to_read);
file.by_ref()
.take(maximum_bytes_to_read as u64)
.read_to_end(&mut buffer)
.unwrap();
println!("length: {}", buffer.len());
println!("part_number: {}", part_number);
if buffer.len() == 0 {
// The file has ended.
break;
}
let next_buffer = Vec::with_capacity(CHUNK_SIZE);
let data_to_send = buffer;
let completed_parts_cloned = completed_parts.clone();
let create_upload_part_arc_cloned = create_upload_part_arc.clone();
let send_part_task_future = tokio::task::spawn(async move {
let part = create_upload_part_arc_cloned(data_to_send.to_vec(), part_number);
{
let part_number = part.part_number;
let client = super::get_client().await;
let response = client.s3.upload_part(part).await;
completed_parts_cloned.lock().unwrap().push(CompletedPart {
e_tag: response
.expect("Couldn't complete multipart upload")
.e_tag
.clone(),
part_number: Some(part_number),
});
}
});
multiple_parts_futures.push(send_part_task_future);
buffer = next_buffer;
part_number = part_number + 1;
}
let client = super::get_client().await;
println!("waiting for futures");
let _results = futures::future::join_all(multiple_parts_futures).await;
let mut completed_parts_vector = completed_parts.lock().unwrap().to_vec();
completed_parts_vector.sort_by_key(|part| part.part_number);
println!("futures done");
let completed_upload = CompletedMultipartUpload {
parts: Some(completed_parts_vector),
};
let complete_req = CompleteMultipartUploadRequest {
bucket: client.bucket_name.to_owned(),
key: destination_filename.to_owned(),
upload_id: upload_id.to_owned(),
multipart_upload: Some(completed_upload),
..Default::default()
};
client
.s3
.complete_multipart_upload(complete_req)
.await
.expect("Couldn't complete multipart upload");
println!(
"time taken: {}, with chunk:: {}",
now.elapsed().as_secs(),
CHUNK_SIZE
);
}