Rust tokio:在生成的线程中等待异步函数
Rust tokio: Awaiting an async function in spawned thread
尝试在新的 tokio 线程中调用异步函数会导致某些函数出错。
在这个最小的例子中,使用了 crates tokio 和 iota-streams。方法 send_announce() 是异步的,returns 是一个地址。等待此方法会导致编译错误,指出 std::Marker::Send 特性未实现
dyn std::future::Future<Output = Result<GenericMessage<TangleAddress, BinaryBody>, iota_streams::iota_streams_core::Error>>
dyn std::future::Future<Output = Result<(), iota_streams::iota_streams_core::Error>>
dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::sizeof::Context<KeccakF1600>, iota_streams::iota_streams_core::Error>>
dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::wrap::Context<KeccakF1600, &mut [u8]>, iota_streams::iota_streams_core::Error>>
根据我的理解,问题是缺少一个或多个 Sync/Send trait 实现并且能够在线程之间传递数据 Rust 需要整个链来实现 Sync 和 Send。
文档指出上述结构实现了 Sync 和 Send as auto 特性:(iota_streams_core::Error, wrap::Context, TangleAddress, BinaryBody, command::sizeof::Context, KeccakF1600 .. .)
在主线程中调用相同的函数工作正常。
我尝试将 send_announce() 的结果包装到一个盒子中,实现 Send trait unsafe 并将响应包装到一个结构中......,而不更改编译错误。
在这种情况下,动态未来响应似乎有点问题。我是生锈的新手,我将不胜感激我能获得的有关如何解决此问题的所有帮助或想法。这种方法甚至可行吗?
我的程序应该通过调用来调用并在单独的线程中处理请求。在此线程内,例如此公告 link 已生成。
显示的示例是将问题减少到重要部分的最小示例。
在 Ubuntu 上进行了防锈和夜间测试。
// main.rs
use iota_streams::{
app::transport::tangle::client::Client,
app_channels::api::tangle::{Author, ChannelType},
core::Result,
};
use rand::Rng;
#[tokio::main]
async fn main() -> Result<()> {
//
// This works fine
//
let seed = generate_seed();
let client = Client::new_from_url("https://chrysalis-nodes.iota.org");
let mut author = Author::new(&seed, ChannelType::SingleBranch, client.clone());
//
// No error occurs here
//
let announcement_link = author.send_announce().await?;
//
// Spawn new thread
//
tokio::spawn(async move {
let seed = generate_seed();
let client = Client::new_from_url("https://chrysalis-nodes.iota.org");
//
// Error occurs here
//
let announcement_link = author.send_announce().await?;
Ok(())
});
Ok(())
}
// Make a seed
const ALPH9: &str = "ABCDEFGHIJKLMNOPQRSTUVWXYZ9";
fn generate_seed() -> String {
let seed: String = (0..81)
.map(|_| { ALPH9
.chars().nth(rand::thread_rng().gen_range(0..27)).unwrap()
}).collect::<String>();
seed
}
# Cargo.toml
[package]
name = "example"
version = "0.1.0"
edition = "2021"
[dependencies]
iota-streams = { git = "https://github.com/iotaledger/streams", branch = "develop"}
tokio = { version = "1.17.0", features = ["full"] }
rand = "0.8.5"
error: generator cannot be sent between threads safely
--> src/main.rs:17:5
|
17 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<GenericMessage<TangleAddress, BinaryBody>, iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
--> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
error: generator cannot be sent between threads safely
--> src/main.rs:17:5
|
17 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<(), iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
--> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
error: generator cannot be sent between threads safely
--> src/main.rs:17:5
|
17 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::sizeof::Context<KeccakF1600>, iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
--> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
error: generator cannot be sent between threads safely
--> src/main.rs:17:5
|
17 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::wrap::Context<KeccakF1600, &mut [u8]>, iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
--> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
error: could not compile `teststh` due to 4 previous errors
author.send_announce()
返回的 future 不包含 Send
,所以你不能在 tokio::spawn()
中使用它。
您可以尝试使用 tokio::task::LocalSet
which lets you spawn non-Send
futures with tokio::task::spawn_local
。这适用于 运行 您在创建 LocalSet
.
的单个 OS 线程上产生的任何期货
如果您想在线程池上生成非 Send
期货,您可以使用 tokio_util::task::LocalPoolHandle
,它通过将工作分配给给定数量的 OS 线程来工作,每个有自己的 LocalSet
.
尝试在新的 tokio 线程中调用异步函数会导致某些函数出错。
在这个最小的例子中,使用了 crates tokio 和 iota-streams。方法 send_announce() 是异步的,returns 是一个地址。等待此方法会导致编译错误,指出 std::Marker::Send 特性未实现
dyn std::future::Future<Output = Result<GenericMessage<TangleAddress, BinaryBody>, iota_streams::iota_streams_core::Error>>
dyn std::future::Future<Output = Result<(), iota_streams::iota_streams_core::Error>>
dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::sizeof::Context<KeccakF1600>, iota_streams::iota_streams_core::Error>>
dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::wrap::Context<KeccakF1600, &mut [u8]>, iota_streams::iota_streams_core::Error>>
根据我的理解,问题是缺少一个或多个 Sync/Send trait 实现并且能够在线程之间传递数据 Rust 需要整个链来实现 Sync 和 Send。
文档指出上述结构实现了 Sync 和 Send as auto 特性:(iota_streams_core::Error, wrap::Context, TangleAddress, BinaryBody, command::sizeof::Context, KeccakF1600 .. .)
在主线程中调用相同的函数工作正常。
我尝试将 send_announce() 的结果包装到一个盒子中,实现 Send trait unsafe 并将响应包装到一个结构中......,而不更改编译错误。
在这种情况下,动态未来响应似乎有点问题。我是生锈的新手,我将不胜感激我能获得的有关如何解决此问题的所有帮助或想法。这种方法甚至可行吗?
我的程序应该通过调用来调用并在单独的线程中处理请求。在此线程内,例如此公告 link 已生成。
显示的示例是将问题减少到重要部分的最小示例。 在 Ubuntu 上进行了防锈和夜间测试。
// main.rs
use iota_streams::{
app::transport::tangle::client::Client,
app_channels::api::tangle::{Author, ChannelType},
core::Result,
};
use rand::Rng;
#[tokio::main]
async fn main() -> Result<()> {
//
// This works fine
//
let seed = generate_seed();
let client = Client::new_from_url("https://chrysalis-nodes.iota.org");
let mut author = Author::new(&seed, ChannelType::SingleBranch, client.clone());
//
// No error occurs here
//
let announcement_link = author.send_announce().await?;
//
// Spawn new thread
//
tokio::spawn(async move {
let seed = generate_seed();
let client = Client::new_from_url("https://chrysalis-nodes.iota.org");
//
// Error occurs here
//
let announcement_link = author.send_announce().await?;
Ok(())
});
Ok(())
}
// Make a seed
const ALPH9: &str = "ABCDEFGHIJKLMNOPQRSTUVWXYZ9";
fn generate_seed() -> String {
let seed: String = (0..81)
.map(|_| { ALPH9
.chars().nth(rand::thread_rng().gen_range(0..27)).unwrap()
}).collect::<String>();
seed
}
# Cargo.toml
[package]
name = "example"
version = "0.1.0"
edition = "2021"
[dependencies]
iota-streams = { git = "https://github.com/iotaledger/streams", branch = "develop"}
tokio = { version = "1.17.0", features = ["full"] }
rand = "0.8.5"
error: generator cannot be sent between threads safely
--> src/main.rs:17:5
|
17 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<GenericMessage<TangleAddress, BinaryBody>, iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
--> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
error: generator cannot be sent between threads safely
--> src/main.rs:17:5
|
17 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<(), iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
--> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
error: generator cannot be sent between threads safely
--> src/main.rs:17:5
|
17 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::sizeof::Context<KeccakF1600>, iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
--> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
error: generator cannot be sent between threads safely
--> src/main.rs:17:5
|
17 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::wrap::Context<KeccakF1600, &mut [u8]>, iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
--> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
error: could not compile `teststh` due to 4 previous errors
author.send_announce()
返回的 future 不包含 Send
,所以你不能在 tokio::spawn()
中使用它。
您可以尝试使用 tokio::task::LocalSet
which lets you spawn non-Send
futures with tokio::task::spawn_local
。这适用于 运行 您在创建 LocalSet
.
如果您想在线程池上生成非 Send
期货,您可以使用 tokio_util::task::LocalPoolHandle
,它通过将工作分配给给定数量的 OS 线程来工作,每个有自己的 LocalSet
.