为什么从 Actix Web 处理程序中的 Rusoto S3 流读取会导致死锁?
Why does reading from a Rusoto S3 stream inside an Actix Web handler cause a deadlock?
我正在使用 actix_web
和 rusoto_s3
编写应用程序。
当我 运行 actix 请求之外的命令直接来自 main
时,它 运行 没问题,并且 get_object
按预期工作。当它被封装在 actix_web 请求中时,流将永远被阻塞。
我有一个为所有请求共享的客户端,它被封装到一个 Arc
中(这发生在 actix 数据内部)。
完整代码:
fn index(
_req: HttpRequest,
path: web::Path<String>,
s3: web::Data<S3Client>,
) -> impl Future<Item = HttpResponse, Error = actix_web::Error> {
s3.get_object(GetObjectRequest {
bucket: "my_bucket".to_owned(),
key: path.to_owned(),
..Default::default()
})
.and_then(move |res| {
info!("Response {:?}", res);
let mut stream = res.body.unwrap().into_blocking_read();
let mut body = Vec::new();
stream.read_to_end(&mut body).unwrap();
match process_file(body.as_slice()) {
Ok(result) => Ok(result),
Err(error) => Err(RusotoError::from(error)),
}
})
.map_err(|e| match e {
RusotoError::Service(GetObjectError::NoSuchKey(key)) => {
actix_web::error::ErrorNotFound(format!("{} not found", key))
}
error => {
error!("Error: {:?}", error);
actix_web::error::ErrorInternalServerError("error")
}
})
.from_err()
.and_then(move |img| HttpResponse::Ok().body(Body::from(img)))
}
fn health() -> HttpResponse {
HttpResponse::Ok().finish()
}
fn main() -> std::io::Result<()> {
let name = "rust_s3_test";
env::set_var("RUST_LOG", "debug");
pretty_env_logger::init();
let sys = actix_rt::System::builder().stop_on_panic(true).build();
let prometheus = PrometheusMetrics::new(name, "/metrics");
let s3 = S3Client::new(Region::Custom {
name: "eu-west-1".to_owned(),
endpoint: "http://localhost:9000".to_owned(),
});
let s3_client_data = web::Data::new(s3);
Server::build()
.bind(name, "0.0.0.0:8080", move || {
HttpService::build().keep_alive(KeepAlive::Os).h1(App::new()
.register_data(s3_client_data.clone())
.wrap(prometheus.clone())
.wrap(actix_web::middleware::Logger::default())
.service(web::resource("/health").route(web::get().to(health)))
.service(web::resource("/{file_name}").route(web::get().to_async(index))))
})?
.start();
sys.run()
}
在 stream.read_to_end
中,线程被阻塞且从未解决。
我尝试过为每个请求克隆客户端并为每个请求创建一个新客户端,但我在所有情况下都得到了相同的结果。
我是不是做错了什么?
如果我不使用异步它就可以工作...
s3.get_object(GetObjectRequest {
bucket: "my_bucket".to_owned(),
key: path.to_owned(),
..Default::default()
})
.sync()
.unwrap()
.body
.unwrap()
.into_blocking_read();
let mut body = Vec::new();
io::copy(&mut stream, &mut body);
这是 Tokio 的问题吗?
let mut stream = res.body.unwrap().into_blocking_read();
检查 implementation of into_blocking_read()
:它调用 .wait()
。您不应该在 Future
中调用阻塞代码。
由于 Rusoto 的 body
是一个 Stream
,所以有一种方法可以异步读取它:
.and_then(move |res| {
info!("Response {:?}", res);
let stream = res.body.unwrap();
stream.concat2().map(move |file| {
process_file(&file[..]).unwrap()
})
.map_err(|e| RusotoError::from(e)))
})
process_file
不应阻塞封闭的 Future
。如果需要阻塞,可以考虑运行在新线程上,或者封装成tokio_threadpool's blocking
。
注意:你可以在你的实现中使用tokio_threadpool的blocking
,但我建议你先了解它是如何工作的。
如果你不打算将整个文件加载到内存中,你可以使用for_each
:
stream.for_each(|part| {
//process each part in here
//Warning! Do not add blocking code here either.
})
另请参阅:
我正在使用 actix_web
和 rusoto_s3
编写应用程序。
当我 运行 actix 请求之外的命令直接来自 main
时,它 运行 没问题,并且 get_object
按预期工作。当它被封装在 actix_web 请求中时,流将永远被阻塞。
我有一个为所有请求共享的客户端,它被封装到一个 Arc
中(这发生在 actix 数据内部)。
完整代码:
fn index(
_req: HttpRequest,
path: web::Path<String>,
s3: web::Data<S3Client>,
) -> impl Future<Item = HttpResponse, Error = actix_web::Error> {
s3.get_object(GetObjectRequest {
bucket: "my_bucket".to_owned(),
key: path.to_owned(),
..Default::default()
})
.and_then(move |res| {
info!("Response {:?}", res);
let mut stream = res.body.unwrap().into_blocking_read();
let mut body = Vec::new();
stream.read_to_end(&mut body).unwrap();
match process_file(body.as_slice()) {
Ok(result) => Ok(result),
Err(error) => Err(RusotoError::from(error)),
}
})
.map_err(|e| match e {
RusotoError::Service(GetObjectError::NoSuchKey(key)) => {
actix_web::error::ErrorNotFound(format!("{} not found", key))
}
error => {
error!("Error: {:?}", error);
actix_web::error::ErrorInternalServerError("error")
}
})
.from_err()
.and_then(move |img| HttpResponse::Ok().body(Body::from(img)))
}
fn health() -> HttpResponse {
HttpResponse::Ok().finish()
}
fn main() -> std::io::Result<()> {
let name = "rust_s3_test";
env::set_var("RUST_LOG", "debug");
pretty_env_logger::init();
let sys = actix_rt::System::builder().stop_on_panic(true).build();
let prometheus = PrometheusMetrics::new(name, "/metrics");
let s3 = S3Client::new(Region::Custom {
name: "eu-west-1".to_owned(),
endpoint: "http://localhost:9000".to_owned(),
});
let s3_client_data = web::Data::new(s3);
Server::build()
.bind(name, "0.0.0.0:8080", move || {
HttpService::build().keep_alive(KeepAlive::Os).h1(App::new()
.register_data(s3_client_data.clone())
.wrap(prometheus.clone())
.wrap(actix_web::middleware::Logger::default())
.service(web::resource("/health").route(web::get().to(health)))
.service(web::resource("/{file_name}").route(web::get().to_async(index))))
})?
.start();
sys.run()
}
在 stream.read_to_end
中,线程被阻塞且从未解决。
我尝试过为每个请求克隆客户端并为每个请求创建一个新客户端,但我在所有情况下都得到了相同的结果。
我是不是做错了什么?
如果我不使用异步它就可以工作...
s3.get_object(GetObjectRequest {
bucket: "my_bucket".to_owned(),
key: path.to_owned(),
..Default::default()
})
.sync()
.unwrap()
.body
.unwrap()
.into_blocking_read();
let mut body = Vec::new();
io::copy(&mut stream, &mut body);
这是 Tokio 的问题吗?
let mut stream = res.body.unwrap().into_blocking_read();
检查 implementation of into_blocking_read()
:它调用 .wait()
。您不应该在 Future
中调用阻塞代码。
由于 Rusoto 的 body
是一个 Stream
,所以有一种方法可以异步读取它:
.and_then(move |res| {
info!("Response {:?}", res);
let stream = res.body.unwrap();
stream.concat2().map(move |file| {
process_file(&file[..]).unwrap()
})
.map_err(|e| RusotoError::from(e)))
})
process_file
不应阻塞封闭的 Future
。如果需要阻塞,可以考虑运行在新线程上,或者封装成tokio_threadpool's blocking
。
注意:你可以在你的实现中使用tokio_threadpool的blocking
,但我建议你先了解它是如何工作的。
如果你不打算将整个文件加载到内存中,你可以使用for_each
:
stream.for_each(|part| {
//process each part in here
//Warning! Do not add blocking code here either.
})
另请参阅: