如何实现轮询异步 fn 的 Future 或 Stream?
How to implement a Future or Stream that polls an async fn?
我有一个结构 Test
我想实现 std::future::Future
来轮询 function
:
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
struct Test;
impl Test {
async fn function(&mut self) {}
}
impl Future for Test {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.function() {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => Poll::Ready(()),
}
}
}
那没用:
error[E0308]: mismatched types
--> src/lib.rs:17:13
|
10 | async fn function(&mut self) {}
| - the `Output` of this `async fn`'s expected opaque type
...
17 | Poll::Pending => Poll::Pending,
| ^^^^^^^^^^^^^ expected opaque type, found enum `Poll`
|
= note: expected opaque type `impl Future`
found enum `Poll<_>`
error[E0308]: mismatched types
--> src/lib.rs:18:13
|
10 | async fn function(&mut self) {}
| - the `Output` of this `async fn`'s expected opaque type
...
18 | Poll::Ready(_) => Poll::Ready(()),
| ^^^^^^^^^^^^^^ expected opaque type, found enum `Poll`
|
= note: expected opaque type `impl Future`
found enum `Poll<_>`
我明白 function
必须调用一次,返回的 Future
必须存储在结构中的某个地方,然后必须轮询保存的未来。我试过这个:
struct Test(Option<Box<Pin<dyn Future<Output = ()>>>>);
impl Test {
async fn function(&mut self) {}
fn new() -> Self {
let mut s = Self(None);
s.0 = Some(Box::pin(s.function()));
s
}
}
那也没有用:
error[E0277]: the size for values of type `(dyn Future<Output = ()> + 'static)` cannot be known at compilation time
--> src/lib.rs:7:13
|
7 | struct Test(Option<Box<Pin<dyn Future<Output = ()>>>>);
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ doesn't have a size known at compile-time
|
= help: the trait `Sized` is not implemented for `(dyn Future<Output = ()> + 'static)`
在调用 function()
之后,我获取了 Test
的 &mut
引用,因此我无法更改 Test
变量,因此可以' t 将返回的 Future
存储在 Test
.
中
我确实得到了一个不安全的解决方案(灵感来自 this)
struct Test<'a>(Option<BoxFuture<'a, ()>>);
impl Test<'_> {
async fn function(&mut self) {
println!("I'm alive!");
}
fn new() -> Self {
let mut s = Self(None);
s.0 = Some(unsafe { &mut *(&mut s as *mut Self) }.function().boxed());
s
}
}
impl Future for Test<'_> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.as_mut().unwrap().poll_unpin(cx)
}
}
希望有别的办法
我不确定你想要实现什么以及为什么,但我怀疑你正试图根据一些古老的教程或误解来实现 Future for Test
并且只是让事情变得过于复杂。
您不必手动实施 Future
。 async
函数
async fn function(...) {...}
实际上只是语法糖在幕后翻译成类似
的东西
fn function(...) -> Future<()> {...}
您所要做的就是像任何未来一样使用函数的结果,例如在其上使用 await 或调用块反应器直到它完成。例如。根据你的第一个版本,你可以简单地调用:
let mut test = Test{};
test.function().await;
更新 1
根据您的描述,我仍然认为您试图使这个最小的工作代码段过于复杂,而不需要为任何内容手动实施 Future
:
async fn asyncio() { println!("Doing async IO"); }
struct Test {
count: u32,
}
impl Test {
async fn function(&mut self) {
asyncio().await;
self.count += 1;
}
}
#[tokio::main]
async fn main() {
let mut test = Test{count: 0};
test.function().await;
println!("Count: {}", test.count);
}
虽然有时您可能想要做与您在这里尝试完成的事情类似的事情,但这种情况很少见。因此,大多数阅读本文的人,甚至可能是 OP,可能希望重组,以便用于单个异步执行的结构状态和数据是不同的对象。
回答你的问题,是的,这是有可能的。除非你想绝对求助于不安全的代码,否则你将需要使用 Mutex
and Arc
。您希望在 async fn
中操作的所有字段都必须包含在 Mutex
中,并且函数本身将接受 Arc<Self>
.
但是,我必须强调,这不是一个完美的解决方案,您可能不想这样做。根据您的具体情况,您的解决方案可能会有所不同,但我对 OP 在使用 Stream
s 时试图完成什么的猜测可以通过类似于 this gist that I wrote.
的方法更好地解决
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
};
struct Test {
state: Mutex<Option<Pin<Box<dyn Future<Output = ()>>>>>,
// if available use your async library's Mutex to `.await` locks on `buffer` instead
buffer: Mutex<Vec<u8>>,
}
impl Test {
async fn function(self: Arc<Self>) {
for i in 0..16u8 {
let data: Vec<u8> = vec![i]; // = fs::read(&format("file-{}.txt", i)).await.unwrap();
let mut buflock = self.buffer.lock().unwrap();
buflock.extend_from_slice(&data);
}
}
pub fn new() -> Arc<Self> {
let s = Arc::new(Self {
state: Default::default(),
buffer: Default::default(),
});
{
// start by trying to aquire a lock to the Mutex of the Box
let mut lock = s.state.lock().unwrap();
// create boxed future
let b = Box::pin(s.clone().function());
// insert value into the mutex
*lock = Some(b);
} // block causes the lock to be released
s
}
}
impl Future for Test {
type Output = ();
fn poll(
self: std::pin::Pin<&mut Self>,
ctx: &mut std::task::Context<'_>,
) -> std::task::Poll<<Self as std::future::Future>::Output> {
let mut lock = self.state.lock().unwrap();
let fut: &mut Pin<Box<dyn Future<Output = ()>>> = lock.as_mut().unwrap();
Future::poll(fut.as_mut(), ctx)
}
}
我有一个结构 Test
我想实现 std::future::Future
来轮询 function
:
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
struct Test;
impl Test {
async fn function(&mut self) {}
}
impl Future for Test {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.function() {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => Poll::Ready(()),
}
}
}
那没用:
error[E0308]: mismatched types
--> src/lib.rs:17:13
|
10 | async fn function(&mut self) {}
| - the `Output` of this `async fn`'s expected opaque type
...
17 | Poll::Pending => Poll::Pending,
| ^^^^^^^^^^^^^ expected opaque type, found enum `Poll`
|
= note: expected opaque type `impl Future`
found enum `Poll<_>`
error[E0308]: mismatched types
--> src/lib.rs:18:13
|
10 | async fn function(&mut self) {}
| - the `Output` of this `async fn`'s expected opaque type
...
18 | Poll::Ready(_) => Poll::Ready(()),
| ^^^^^^^^^^^^^^ expected opaque type, found enum `Poll`
|
= note: expected opaque type `impl Future`
found enum `Poll<_>`
我明白 function
必须调用一次,返回的 Future
必须存储在结构中的某个地方,然后必须轮询保存的未来。我试过这个:
struct Test(Option<Box<Pin<dyn Future<Output = ()>>>>);
impl Test {
async fn function(&mut self) {}
fn new() -> Self {
let mut s = Self(None);
s.0 = Some(Box::pin(s.function()));
s
}
}
那也没有用:
error[E0277]: the size for values of type `(dyn Future<Output = ()> + 'static)` cannot be known at compilation time
--> src/lib.rs:7:13
|
7 | struct Test(Option<Box<Pin<dyn Future<Output = ()>>>>);
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ doesn't have a size known at compile-time
|
= help: the trait `Sized` is not implemented for `(dyn Future<Output = ()> + 'static)`
在调用 function()
之后,我获取了 Test
的 &mut
引用,因此我无法更改 Test
变量,因此可以' t 将返回的 Future
存储在 Test
.
我确实得到了一个不安全的解决方案(灵感来自 this)
struct Test<'a>(Option<BoxFuture<'a, ()>>);
impl Test<'_> {
async fn function(&mut self) {
println!("I'm alive!");
}
fn new() -> Self {
let mut s = Self(None);
s.0 = Some(unsafe { &mut *(&mut s as *mut Self) }.function().boxed());
s
}
}
impl Future for Test<'_> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.as_mut().unwrap().poll_unpin(cx)
}
}
希望有别的办法
我不确定你想要实现什么以及为什么,但我怀疑你正试图根据一些古老的教程或误解来实现 Future for Test
并且只是让事情变得过于复杂。
您不必手动实施 Future
。 async
函数
async fn function(...) {...}
实际上只是语法糖在幕后翻译成类似
的东西fn function(...) -> Future<()> {...}
您所要做的就是像任何未来一样使用函数的结果,例如在其上使用 await 或调用块反应器直到它完成。例如。根据你的第一个版本,你可以简单地调用:
let mut test = Test{};
test.function().await;
更新 1
根据您的描述,我仍然认为您试图使这个最小的工作代码段过于复杂,而不需要为任何内容手动实施 Future
:
async fn asyncio() { println!("Doing async IO"); }
struct Test {
count: u32,
}
impl Test {
async fn function(&mut self) {
asyncio().await;
self.count += 1;
}
}
#[tokio::main]
async fn main() {
let mut test = Test{count: 0};
test.function().await;
println!("Count: {}", test.count);
}
虽然有时您可能想要做与您在这里尝试完成的事情类似的事情,但这种情况很少见。因此,大多数阅读本文的人,甚至可能是 OP,可能希望重组,以便用于单个异步执行的结构状态和数据是不同的对象。
回答你的问题,是的,这是有可能的。除非你想绝对求助于不安全的代码,否则你将需要使用 Mutex
and Arc
。您希望在 async fn
中操作的所有字段都必须包含在 Mutex
中,并且函数本身将接受 Arc<Self>
.
但是,我必须强调,这不是一个完美的解决方案,您可能不想这样做。根据您的具体情况,您的解决方案可能会有所不同,但我对 OP 在使用 Stream
s 时试图完成什么的猜测可以通过类似于 this gist that I wrote.
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
};
struct Test {
state: Mutex<Option<Pin<Box<dyn Future<Output = ()>>>>>,
// if available use your async library's Mutex to `.await` locks on `buffer` instead
buffer: Mutex<Vec<u8>>,
}
impl Test {
async fn function(self: Arc<Self>) {
for i in 0..16u8 {
let data: Vec<u8> = vec![i]; // = fs::read(&format("file-{}.txt", i)).await.unwrap();
let mut buflock = self.buffer.lock().unwrap();
buflock.extend_from_slice(&data);
}
}
pub fn new() -> Arc<Self> {
let s = Arc::new(Self {
state: Default::default(),
buffer: Default::default(),
});
{
// start by trying to aquire a lock to the Mutex of the Box
let mut lock = s.state.lock().unwrap();
// create boxed future
let b = Box::pin(s.clone().function());
// insert value into the mutex
*lock = Some(b);
} // block causes the lock to be released
s
}
}
impl Future for Test {
type Output = ();
fn poll(
self: std::pin::Pin<&mut Self>,
ctx: &mut std::task::Context<'_>,
) -> std::task::Poll<<Self as std::future::Future>::Output> {
let mut lock = self.state.lock().unwrap();
let fut: &mut Pin<Box<dyn Future<Output = ()>>> = lock.as_mut().unwrap();
Future::poll(fut.as_mut(), ctx)
}
}