Rust tokio:在生成的线程中等待异步函数

Rust tokio: Awaiting an async function in spawned thread

尝试在新的 tokio 线程中调用异步函数会导致某些函数出错。

在这个最小的例子中,使用了 crates tokio 和 iota-streams。方法 send_announce() 是异步的,returns 是一个地址。等待此方法会导致编译错误,指出 std::Marker::Send 特性未实现

dyn std::future::Future<Output = Result<GenericMessage<TangleAddress, BinaryBody>, iota_streams::iota_streams_core::Error>>
dyn std::future::Future<Output = Result<(), iota_streams::iota_streams_core::Error>>
dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::sizeof::Context<KeccakF1600>, iota_streams::iota_streams_core::Error>>
dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::wrap::Context<KeccakF1600, &mut [u8]>, iota_streams::iota_streams_core::Error>>

根据我的理解,问题是缺少一个或多个 Sync/Send trait 实现并且能够在线程之间传递数据 Rust 需要整个链来实现 Sync 和 Send。

文档指出上述结构实现了 Sync 和 Send as auto 特性:(iota_streams_core::Error, wrap::Context, TangleAddress, BinaryBody, command::sizeof::Context, KeccakF1600 .. .)

在主线程中调用相同的函数工作正常。

我尝试将 send_announce() 的结果包装到一个盒子中,实现 Send trait unsafe 并将响应包装到一个结构中......,而不更改编译错误。

在这种情况下,动态未来响应似乎有点问题。我是生锈的新手,我将不胜感激我能获得的有关如何解决此问题的所有帮助或想法。这种方法甚至可行吗?

我的程序应该通过调用来调用并在单独的线程中处理请求。在此线程内,例如此公告 link 已生成。

显示的示例是将问题减少到重要部分的最小示例。 在 Ubuntu 上进行了防锈和夜间测试。

// main.rs
use iota_streams::{
    app::transport::tangle::client::Client,
    app_channels::api::tangle::{Author, ChannelType},
    core::Result,
};
use rand::Rng;

#[tokio::main]
async fn main() -> Result<()> {
    //
    // This works fine
    //
    let seed = generate_seed();
    let client = Client::new_from_url("https://chrysalis-nodes.iota.org");
    let mut author = Author::new(&seed, ChannelType::SingleBranch, client.clone());
    //
    // No error occurs here
    //
    let announcement_link = author.send_announce().await?;
    //
    // Spawn new thread
    //
    tokio::spawn(async move {
        let seed = generate_seed();
        let client = Client::new_from_url("https://chrysalis-nodes.iota.org");
        //
        // Error occurs here
        //
        let announcement_link = author.send_announce().await?;
        Ok(())
    });

    Ok(())
}
// Make a seed
const ALPH9: &str = "ABCDEFGHIJKLMNOPQRSTUVWXYZ9";
fn generate_seed() -> String {
    let seed: String = (0..81)
        .map(|_| { ALPH9
                .chars().nth(rand::thread_rng().gen_range(0..27)).unwrap()
        }).collect::<String>();
    seed
}
# Cargo.toml
[package]
name = "example"
version = "0.1.0"
edition = "2021"

[dependencies]
iota-streams = { git = "https://github.com/iotaledger/streams", branch  = "develop"}
tokio = { version = "1.17.0", features = ["full"] }
rand = "0.8.5"
error: generator cannot be sent between threads safely
   --> src/main.rs:17:5
    |
17  |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<GenericMessage<TangleAddress, BinaryBody>, iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
   --> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: generator cannot be sent between threads safely
   --> src/main.rs:17:5
    |
17  |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<(), iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
   --> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: generator cannot be sent between threads safely
   --> src/main.rs:17:5
    |
17  |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::sizeof::Context<KeccakF1600>, iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
   --> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: generator cannot be sent between threads safely
   --> src/main.rs:17:5
    |
17  |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::wrap::Context<KeccakF1600, &mut [u8]>, iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
   --> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: could not compile `teststh` due to 4 previous errors

author.send_announce() 返回的 future 不包含 Send,所以你不能在 tokio::spawn() 中使用它。

您可以尝试使用 tokio::task::LocalSet which lets you spawn non-Send futures with tokio::task::spawn_local。这适用于 运行 您在创建 LocalSet.

的单个 OS 线程上产生的任何期货

如果您想在线程池上生成非 Send 期货,您可以使用 tokio_util::task::LocalPoolHandle,它通过将工作分配给给定数量的 OS 线程来工作,每个有自己的 LocalSet.