循环中的 Rust 所有权

Rust ownership in loops

我正在尝试在 Rust 中实现 rabbitmq send/listen 功能,我有以下代码:

struct RabbitMQ {
    connection: Connection,
}

impl RabbitMQ {
    fn connect() -> Self {
        RabbitMQ {
           connection: the created connection
        }
    }
}

impl MessageBroker for RabbitMQ {
    async fn publish(&self, topic: &Topic) -> Result<PublisherConfirm, Error> {
        let channel = self.connection.create_channel().await.unwrap();

        RabbitMQ::create_exchange(&channel, &topic.exchange).await;

        let payload = topic.message.as_bytes();

        let res = channel.basic_publish(
            topic.exchange.name.as_str(),
            topic.queue.routing_key.as_str(),
            topic.exchange.publish_options,
            payload.to_vec(),
            BasicProperties::default(),
         );

        res.await
    }
}

目前一切顺利!

现在我想在一个 for 循环中发布许多消息而不等待服务器的确认,问题是当我生成 tokio 异步任务时我需要移动我的代理值,这使得它对下一个无效循环迭代:

let broker = RabbitMQ::connect(&connection_details).await;

for x in 1..10 {
    tokio::spawn(async move {
        let confirm = broker.publish(&my_topic).await.unwrap();
    }).await.unwrap();
}

以上代码无法编译并出现以下错误:

 error[E0382]: use of moved value: `broker`
  --> src/main.rs:47:33
   |
21 |       let broker = RabbitMQ::connect(&connection_details).await;
   |           ------ move occurs because `broker` has type `message_broker::RabbitMQ`, which >does not implement the `Copy` trait
...
47 |           tokio::spawn(async move {
   |  _________________________________^
48 | |             let confirm = &broker.publish(&enable_cdn).await.unwrap();
   | |                            ------ use occurs due to use in generator
49 | |         }).await.unwrap();
   | |_________^ value moved here, in previous iteration of loop

我无法实现 Copy 特性,因为 Connection 不是原始的,而且我似乎无法使用引用“ &”给经纪人。

我的问题是如何在不编写 n 发布调用的情况下完成此操作?

您正在使用 async move 块,这意味着块中使用的任何 name 都将移至未来,无论正在执行的操作如何。所以写

&broker.publish

块内部没有区别:首先移动 broker,未来(当用 .await 轮询时)对其进行内部引用。所以你需要做的是在块外借用,然后将借用的块移动到块内:

let broker = RabbitMQ::connect(&connection_details).await;

for x in 1..10 {
    let broker = &broker;
    tokio::spawn(async move {
        let confirm = broker.publish(&enable_cdn).await.unwrap();
    }).await.unwrap();
}

但我 认为 这也行不通:tokio::spawn 没有作用域,所以即使你 await-ing 它,编译器不知道它不会比 broker 长寿。就它而言,tokio 任务可以想活多久就活多久。这意味着你现在可能会得到一个生命周期错误(编译器会假设借用 can 比封闭函数长寿,因此它的起源)。

一个简单的解决方案是将连接放在 Arc 或其他东西后面。

或者,重组您的系统以更好地满足 rabbitmq 的要求:不知道您使用的是哪个,但 amiquip 声明连接是 thread-safe 和 通道 而不是 thread-safe 可以发送到其他线程。

因此,与其 publish-ing 隐式连接,不如在循环的每次迭代中创建一个通道并将 that 移动到任务中,以便实际执行发布.

此外,

Now I want to publish many messages in a for loop without waiting for the confirmation from the server

既然你在等待 tokio::spawn 的结果,你还在这样做吗?