如何从产生数据块的慢速处理侧线程流式传输超级请求的主体?
How do I stream a hyper Request's Body from a slow-processing side thread that produces chunks of data?
我有一个生成数据很慢的程序(我们可以说它是计算密集型的,比如计算 pi 的数字)。它产生了 lot 的数据;每个响应可以是 1GiB,不会适合内存,并且 必须 按需生成。我正在使用 hyper 编写 Web 服务以在请求时生成内容。
让我们跳过样板文件(service_fn
、Server::bind
)。
缓慢生成数据的API可能类似于
use std::io;
impl SlowData {
fn new(initial: &str) -> SlowData {
unimplemented!()
}
fn next_block(&self) -> io::Result<&[u8]> {
unimplemented!()
}
}
type ResponseFuture = Box<Future<Item = Response, Error = GenericError> + Send>;
fn run(req: Request) -> ResponseFuture {
// spawn a thread and:
// initialize the generator
// SlowData::new(&req.uri().path());
// spawn a thread and call slow.next_block() until len()==0
// each byte which comes from next_block should go to the client
// as part of the Body
}
请注意 SlowData::new
也是计算密集型的。
最佳情况下,我们会尽量减少副本并将 &[u8]
直接发送到 hyper,而不必将其复制到 Vec
或其他内容。
如何从副线程完成超级请求的主体?
在线程池中启动线程并通过通道发送数据块。通道实现 Stream
并且可以使用 wrap_stream
:
从 Stream
构建超级 Body
use futures::{channel::mpsc, executor::ThreadPool, task::SpawnExt, SinkExt, Stream}; // 0.3.1, features = ["thread-pool"]
use hyper::{
service::{make_service_fn, service_fn},
Body, Response, Server,
}; // 0.13.1
use std::{convert::Infallible, io, thread, time::Duration};
use tokio; // 0.2.6, features = ["macros"]
struct SlowData;
impl SlowData {
fn new(_initial: &str) -> SlowData {
thread::sleep(Duration::from_secs(1));
Self
}
fn next_block(&self) -> io::Result<&[u8]> {
thread::sleep(Duration::from_secs(1));
Ok(b"data")
}
}
fn stream(pool: ThreadPool) -> impl Stream<Item = io::Result<Vec<u8>>> {
let (mut tx, rx) = mpsc::channel(10);
pool.spawn(async move {
let sd = SlowData::new("dummy");
for _ in 0..3 {
let block = sd.next_block().map(|b| b.to_vec());
tx.send(block).await.expect("Unable to send block");
}
})
.expect("Unable to spawn thread");
rx
}
#[tokio::main]
async fn main() {
// Construct our SocketAddr to listen on...
let addr = ([127, 0, 0, 1], 3000).into();
// Create a threadpool (cloning is cheap)...
let pool = ThreadPool::new().unwrap();
// Handle each connection...
let make_service = make_service_fn(|_socket| {
let pool = pool.clone();
async {
// Handle each request...
let svc_fn = service_fn(move |_request| {
let pool = pool.clone();
async {
let data = stream(pool);
let resp = Response::new(Body::wrap_stream(data));
Result::<_, Infallible>::Ok(resp)
}
});
Result::<_, Infallible>::Ok(svc_fn)
}
});
// Bind and serve...
let server = Server::bind(&addr).serve(make_service);
// Finally, run the server
if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
}
创建线程时,无法避免将切片复制到 Vec
。
另请参阅:
- This answer for hyper 0.12 and futures 0.1
- How to implement a stream of futures for a blocking call using futures.rs and Redis PubSub?
我有一个生成数据很慢的程序(我们可以说它是计算密集型的,比如计算 pi 的数字)。它产生了 lot 的数据;每个响应可以是 1GiB,不会适合内存,并且 必须 按需生成。我正在使用 hyper 编写 Web 服务以在请求时生成内容。
让我们跳过样板文件(service_fn
、Server::bind
)。
缓慢生成数据的API可能类似于
use std::io;
impl SlowData {
fn new(initial: &str) -> SlowData {
unimplemented!()
}
fn next_block(&self) -> io::Result<&[u8]> {
unimplemented!()
}
}
type ResponseFuture = Box<Future<Item = Response, Error = GenericError> + Send>;
fn run(req: Request) -> ResponseFuture {
// spawn a thread and:
// initialize the generator
// SlowData::new(&req.uri().path());
// spawn a thread and call slow.next_block() until len()==0
// each byte which comes from next_block should go to the client
// as part of the Body
}
请注意 SlowData::new
也是计算密集型的。
最佳情况下,我们会尽量减少副本并将 &[u8]
直接发送到 hyper,而不必将其复制到 Vec
或其他内容。
如何从副线程完成超级请求的主体?
在线程池中启动线程并通过通道发送数据块。通道实现 Stream
并且可以使用 wrap_stream
:
Stream
构建超级 Body
use futures::{channel::mpsc, executor::ThreadPool, task::SpawnExt, SinkExt, Stream}; // 0.3.1, features = ["thread-pool"]
use hyper::{
service::{make_service_fn, service_fn},
Body, Response, Server,
}; // 0.13.1
use std::{convert::Infallible, io, thread, time::Duration};
use tokio; // 0.2.6, features = ["macros"]
struct SlowData;
impl SlowData {
fn new(_initial: &str) -> SlowData {
thread::sleep(Duration::from_secs(1));
Self
}
fn next_block(&self) -> io::Result<&[u8]> {
thread::sleep(Duration::from_secs(1));
Ok(b"data")
}
}
fn stream(pool: ThreadPool) -> impl Stream<Item = io::Result<Vec<u8>>> {
let (mut tx, rx) = mpsc::channel(10);
pool.spawn(async move {
let sd = SlowData::new("dummy");
for _ in 0..3 {
let block = sd.next_block().map(|b| b.to_vec());
tx.send(block).await.expect("Unable to send block");
}
})
.expect("Unable to spawn thread");
rx
}
#[tokio::main]
async fn main() {
// Construct our SocketAddr to listen on...
let addr = ([127, 0, 0, 1], 3000).into();
// Create a threadpool (cloning is cheap)...
let pool = ThreadPool::new().unwrap();
// Handle each connection...
let make_service = make_service_fn(|_socket| {
let pool = pool.clone();
async {
// Handle each request...
let svc_fn = service_fn(move |_request| {
let pool = pool.clone();
async {
let data = stream(pool);
let resp = Response::new(Body::wrap_stream(data));
Result::<_, Infallible>::Ok(resp)
}
});
Result::<_, Infallible>::Ok(svc_fn)
}
});
// Bind and serve...
let server = Server::bind(&addr).serve(make_service);
// Finally, run the server
if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
}
创建线程时,无法避免将切片复制到 Vec
。
另请参阅:
- This answer for hyper 0.12 and futures 0.1
- How to implement a stream of futures for a blocking call using futures.rs and Redis PubSub?