如何在像 Vec 这样的容器中管理 tokio oneshot::channel?

How to manage tokio oneshot::channel in containers like a Vec?

我想用容器来管理tokio::oneshot::Sender。我正在使用 Vec,但似乎 Vec 中保存的值是引用,我需要使用 self,而不是引用来调用它:

use bytes::BytesMut;
use tokio::sync::oneshot;

#[derive(Clone)]
pub enum ChannelData {
    Video { timestamp: u32, data: BytesMut },
    Audio { timestamp: u32, data: BytesMut },
    MetaData {},
}

pub type PlayerPublisher = oneshot::Sender<ChannelData>;

pub struct Channel {
    player_producers: Vec<PlayerPublisher>, // consumers who subscribe this channel.
}

impl Channel {
    fn new() -> Self {
        Self {
            player_producers: Vec::new(),
        }
    }

    async fn transmit(&mut self) {
        let b = BytesMut::new();
        let data = ChannelData::Video {
            timestamp: 234,
            data: b,
        };

        for i in self.player_producers {
            i.send(data);
        }
    }
}

错误:

error[E0507]: cannot move out of `self.player_producers` which is behind a mutable reference
  --> src/lib.rs:31:18
   |
31 |         for i in self.player_producers {
   |                  ^^^^^^^^^^^^^^^^^^^^^
   |                  |
   |                  move occurs because `self.player_producers` has type `Vec<tokio::sync::oneshot::Sender<ChannelData>>`, which does not implement the `Copy` trait
   |                  help: consider iterating over a slice of the `Vec<_>`'s content: `&self.player_producers`

error[E0382]: use of moved value: `data`
  --> src/lib.rs:32:20
   |
26 |         let data = ChannelData::Video {
   |             ---- move occurs because `data` has type `ChannelData`, which does not implement the `Copy` trait
...
32 |             i.send(data);
   |                    ^^^^ value moved here, in previous iteration of loop

我怎样才能实现我的目标?

pub fn send(mut self, t: T) -> Result<(), T> {
    let inner = self.inner.take().unwrap();

    inner.value.with_mut(|ptr| unsafe {
        *ptr = Some(t);
    });

    if !inner.complete() {
        unsafe {
            return Err(inner.consume_value().unwrap());
        }
    }

    Ok(())
}

调用 send 需要 oneshot 频道的所有权。要获得该所有权,您可以取得容器的所有权。在这种情况下,最简单的方法是取得 Channel:

的所有权
async fn transmit(self) { // Changed to `self`
    for i in self.player_producers {
        let data = ChannelData::Video {
            timestamp: 234,
            data: BytesMut::new(),
        };
        if i.send(data).is_err() {
            panic!("Unable to send data");
        }
    }
}

其他选择drain合集:

for i in self.player_producers.drain(..) {

swap the collection with an empty one:

use std::mem;
for i in mem::take(&mut self.player_producers) {

在每种情况下,data 有效负载在每次发送时都必须构建(或克隆)。

另请参阅:

  • How can I swap in a new value for a field in a mutable reference to a structure?

Tokio oneshot 发件人只能发送一条消息,因此 send 将消耗 Sender。要从 Vec 中调用 send,您首先必须 删除 它。在迭代它们时从 &mut Vec 中删除所有元素的方法是 drain it:

for i in self.player_producers.drain(..) {
    i.send(data);
}

您的另一个错误是 data 被调用 send 消耗掉了。由于您想将相同的数据发送给多个发件人,因此您必须 clone it:

i.send(data.clone());

注意 send returns a Result.

的警告