等待多个期货借用可变自我

Waiting on multiple futures borrowing mutable self

以下每个方法都需要(&mut self)来操作。下面的代码给出了错误。

cannot borrow *self as mutable more than once at a time

我怎样才能正确地做到这一点?

loop {
        let future1 = self.handle_new_connections(sender_to_connector.clone());
        let future2 = self.handle_incoming_message(&mut receiver_from_peers);
        let future3 = self.handle_outgoing_message();

        tokio::pin!(future1, future2, future3);
        tokio::select! {
            _=future1=>{},
            _=future2=>{},
            _=future3=>{}
        }
    }

不允许对一个对象有多个可变引用,这是有充分理由的。 想象一下,您将一个对象可变地传递给 2 个不同的函数,并且他们编辑了不同步的对象,因为您没有任何适当的机制。那么你最终会遇到一种叫做竞争条件的东西。

为了防止这个错误,rust 一次只允许一个对象的一个​​可变引用,但你可以有多个不可变引用,而且你经常看到人们使用内部可变性模式。

在您的情况下,您希望数据不能同时被 2 个不同的线程修改,因此您将其包装在 Lock 或 RwLock 中,因为您希望多个线程能够拥有它将其包装在弧形中的值。

here 您可以阅读有关内部可变性的更多详细信息。

或者,在声明函数类型的同时,您可以添加适当的生命周期以指示结果 Future 将在相同的上下文中等待,方法是给它一个生命周期,因为您的代码在下一次迭代之前等待未来也做这个把戏。

我在处理异步代码时遇到了同样的问题。这是我的发现:

假设您有一个 Engine,其中包含 incomingoutgoing:

struct Engine {
    log: Arc<Mutex<Vec<String>>>,
    outgoing: UnboundedSender<String>,
    incoming: UnboundedReceiver<String>,
}

我们的目标是创建两个函数 process_incomingprocess_logic,然后同时轮询它们,而不会弄乱 Rust 中的借用检查器。

这里重要的是:

  1. 您不能同时将 &mut self 传递给这些异步函数。
  2. incomingoutgoing 最多只能由一个函数持有。
  3. process_incomingprocess_logic的数据访问都需要用锁包裹。
  4. 任何尝试直接锁定 Engine 都会导致运行时出现死锁。

所以这让我们放弃使用支持关联函数的方法:

impl Engine {
    // ...
    async fn process_logic(outgoing: &mut UnboundedSender<String>, log: Arc<Mutex<Vec<String>>>) {
        loop {
            Delay::new(Duration::from_millis(1000)).await.unwrap();
            let msg: String = "ping".into();
            println!("outgoing: {}", msg);
            log.lock().push(msg.clone());
            outgoing.send(msg).await.unwrap();
        }
    }

    async fn process_incoming(
        incoming: &mut UnboundedReceiver<String>,
        log: Arc<Mutex<Vec<String>>>,
    ) {
        while let Some(msg) = incoming.next().await {
            println!("incoming: {}", msg);
            log.lock().push(msg);
        }
    }
}

然后我们可以将 main 写成:

fn main() {
    futures::executor::block_on(async {
        let mut engine = Engine::new();

        let a = Engine::process_incoming(&mut engine.incoming, engine.log.clone()).fuse();
        let b = Engine::process_logic(&mut engine.outgoing, engine.log).fuse();

        futures::pin_mut!(a, b);

        select! {
            _ = a => {},
            _ = b => {},
        }
    });
}

我放了整个例子here。 这是一个可行的解决方案,只需要注意您应该在依赖项中添加 futuresfutures-timer