AsyncRead 包装同步读取
AsyncRead wrapper over sync read
我在通过同步读取实现 AsyncRead 以适应 Rust 中的异步世界时遇到了这个问题。
我正在处理的同步读取实现是对原始 C 同步实现的包装,很像 std::fs::File::read
;因此,为了简单起见,我将在下文中使用 std::io::Read
。
代码如下:
use futures::{AsyncRead, Future};
use std::task::{Context, Poll};
use std::pin::Pin;
use tokio::task;
use std::fs::File;
use std::io::Read;
use std::io::Result;
struct FileAsyncRead {
path: String
}
impl AsyncRead for FileAsyncRead {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
let path = self.path.to_owned();
let buf_len = buf.len();
let mut handle = task::spawn_blocking(move || {
let mut vec = vec![0u8; buf_len];
let mut file = File::open(path).unwrap();
let len = file.read(vec.as_mut_slice());
(vec, len)
});
match Pin::new(&mut handle).poll(cx) {
Poll::Ready(l) => {
let v_l = l.unwrap();
let _c_l = v_l.0.as_slice().read(buf);
Poll::Ready(v_l.1)
}
Poll::Pending => Poll::Pending
}
}
}
当前的实现每次都创建一个与外部 buf: &mut [u8]
大小相同的新向量,因为:
`buf` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
buf: &mut [u8],
| --------- this data with an anonymous lifetime `'_`...
我的问题是:
- 是否可以避免在
spwan_blocking
中创建矢量并在 poll_read
中改变 buf
?为了避免向量分配和复制?
- 有没有比
spawn_blocking
和 Pin::new(&mut handle).poll(cx)
更好的方式来表达这种“包装”逻辑?在 Rust 中执行此操作的更惯用的方法是什么?
这段代码有些奇怪:
- 如果此代码被调用一次,它可能会 return Poll::Pending,因为 spawn_blocking 甚至需要时间来启动任务。
- 如果多次调用此方法,则会创建多个不相关的任务来读取文件的同一部分,并且可能会忽略 (1) 导致的结果,这可能不是您想要的。
要解决此问题,您可以在第一次创建任务时记住 FileAsyncRead 结构中的任务,然后在下一次调用时仅在需要时启动新任务,并轮询现有任务。
有了这个 API 你似乎无法避免双缓冲,因为你的 API 是阻塞的,而 ReadBuf 缓冲区不是共享的,你需要做一个阻塞读入其他缓冲区,然后在新的非阻塞调用 poll_read()
到达时复制数据。
我在通过同步读取实现 AsyncRead 以适应 Rust 中的异步世界时遇到了这个问题。
我正在处理的同步读取实现是对原始 C 同步实现的包装,很像 std::fs::File::read
;因此,为了简单起见,我将在下文中使用 std::io::Read
。
代码如下:
use futures::{AsyncRead, Future};
use std::task::{Context, Poll};
use std::pin::Pin;
use tokio::task;
use std::fs::File;
use std::io::Read;
use std::io::Result;
struct FileAsyncRead {
path: String
}
impl AsyncRead for FileAsyncRead {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
let path = self.path.to_owned();
let buf_len = buf.len();
let mut handle = task::spawn_blocking(move || {
let mut vec = vec![0u8; buf_len];
let mut file = File::open(path).unwrap();
let len = file.read(vec.as_mut_slice());
(vec, len)
});
match Pin::new(&mut handle).poll(cx) {
Poll::Ready(l) => {
let v_l = l.unwrap();
let _c_l = v_l.0.as_slice().read(buf);
Poll::Ready(v_l.1)
}
Poll::Pending => Poll::Pending
}
}
}
当前的实现每次都创建一个与外部 buf: &mut [u8]
大小相同的新向量,因为:
`buf` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
buf: &mut [u8],
| --------- this data with an anonymous lifetime `'_`...
我的问题是:
- 是否可以避免在
spwan_blocking
中创建矢量并在poll_read
中改变buf
?为了避免向量分配和复制? - 有没有比
spawn_blocking
和Pin::new(&mut handle).poll(cx)
更好的方式来表达这种“包装”逻辑?在 Rust 中执行此操作的更惯用的方法是什么?
这段代码有些奇怪:
- 如果此代码被调用一次,它可能会 return Poll::Pending,因为 spawn_blocking 甚至需要时间来启动任务。
- 如果多次调用此方法,则会创建多个不相关的任务来读取文件的同一部分,并且可能会忽略 (1) 导致的结果,这可能不是您想要的。
要解决此问题,您可以在第一次创建任务时记住 FileAsyncRead 结构中的任务,然后在下一次调用时仅在需要时启动新任务,并轮询现有任务。
有了这个 API 你似乎无法避免双缓冲,因为你的 API 是阻塞的,而 ReadBuf 缓冲区不是共享的,你需要做一个阻塞读入其他缓冲区,然后在新的非阻塞调用 poll_read()
到达时复制数据。