泛化此异步代码需要哪些生命周期和边界?

What lifetimes and bounds are needed to generalize this async code?

我这里有使用 tokio 和 serde 的 websocket 代码:

use async_once::AsyncOnce;
use common_wasm::models::status::{CommandMessage, StatusMessage};
use futures_util::{SinkExt, StreamExt};
use lazy_static::lazy_static;
use std::{collections::VecDeque, net::SocketAddr};
use tokio::{
    net::{TcpListener, TcpStream}, sync::{broadcast, mpsc}
};
use tokio_tungstenite::{
    accept_async, tungstenite::{Error, Message, Result}
};
use tracing::*;

// 
lazy_static! {
    pub static ref STATUS_REPORTER: AsyncOnce<StatusWs> = AsyncOnce::new(async {
        info!("Init lazy static WS");
        let server = StatusWs::init("ws://localhost:44444").await;
        server
    });
}

use StatusMessage as SenderType;
use CommandMessage as ReceiveType;

pub struct StatusWs {
    buf: VecDeque<ReceiveType>,
    rx_client_msg: mpsc::Receiver<ReceiveType>,
    tx_server_msg: broadcast::Sender<SenderType>,
}

impl StatusWs {
    pub async fn init(addr: &str) -> StatusWs {
        info!("Init Status WS on {}", addr);

        let listener = TcpListener::bind(&addr).await.expect("Can't listen");

        // Clients producting to server, they use the tx to send and server uses the rx to read
        let (tx_client_msg, rx_client_msg) = mpsc::channel::<ReceiveType>(32);

        // spmc for server to broadcast status to listeners. Server uses tx to send and client uses rx to read
        let (tx_server_msg, _rx_server_msg) = broadcast::channel::<SenderType>(10);

        let tx_server_2 = tx_server_msg.clone();
        tokio::spawn(async move {
            while let Ok((stream, peer)) = listener.accept().await {
                info!("Peer address connected: {}", peer);

                let tx_client = tx_client_msg.clone();
                let rx_server = tx_server_msg.subscribe();
                tokio::spawn(async move {
                    accept_connection(peer, stream, tx_client, rx_server).await;
                });
            }
        });

        StatusWs { buf: VecDeque::new(), rx_client_msg, tx_server_msg: tx_server_2 }
    }

    pub async fn reportinfo(&self, msg: &SenderType) {
        let my_msg = msg.clone();

        match &self.tx_server_msg.send(my_msg) {
            Ok(_size) => {
                //trace!("Server Sending OK {}", size)
            },
            Err(_err) => {
                //trace!("Server Sending ERR {:?}", err)
            },
        }
    }

    pub async fn next(&mut self) -> Result<Option<ReceiveType>> {
        loop {
            // If buffer contains data, we can directly return it.
            if let Some(data) = self.buf.pop_front() {
                return Ok(Some(data));
            }

            // Fetch new response if buffer is empty.
            let response = self.next_response().await?;

            // Handle the response, possibly adding to the buffer
            self.handle_response(response)?;
        }
    }

    async fn next_response(&mut self) -> Result<ReceiveType> {
        loop {
            tokio::select! { // TODO don't need select if there's only one thing?
                Some(msg) = self.rx_client_msg.recv() => {
                    return Ok(msg)
                },
            }
        }
    }

    fn handle_response(&mut self, response: ReceiveType) -> Result<()> {
        self.buf.push_back(response);
        Ok(())
    }
}

async fn accept_connection(peer: SocketAddr, stream: TcpStream, tx_client: mpsc::Sender<ReceiveType>, rx_server: broadcast::Receiver<SenderType>) {
    info!("Accepting connection from {}", peer);
    if let Err(e) = handle_connection(peer, stream, tx_client, rx_server).await {
        match e {
            Error::ConnectionClosed | Error::Protocol(_) | Error::Utf8 => error!("Connection closed"),
            err => error!("Error processing connection: {}", err),
        }
    }
}

async fn handle_connection(
    _peer: SocketAddr, stream: TcpStream, tx_client: mpsc::Sender<ReceiveType>, mut rx_server: broadcast::Receiver<SenderType>,
) -> Result<()> {
    let ws_stream = accept_async(stream).await.expect("Failed to accept");
    let (mut ws_sender, mut ws_receiver) = ws_stream.split();

    loop {
        tokio::select! {
            remote_msg = ws_receiver.next() => {
                match remote_msg {
                    Some(msg) => {
                        let msg = msg?;
                        match msg {
                            Message::Text(resptxt) => {
                                match serde_json::from_str::<ReceiveType>(&resptxt) {
                                    Ok(cmd) => { let _ = tx_client.send(cmd).await; },
                                    Err(err) => error!("Error deserializing: {}", err),
                                }
                            },
                            Message::Close(_) => break,
                            _ => { },
                        }
                    }
                    None => break,
                }
            }
            Ok(msg) = rx_server.recv() => {
                match serde_json::to_string(&msg) {
                    Ok(txt) => ws_sender.send(Message::Text(txt)).await?,
                    Err(_) => todo!(),
                }
            }
        }
    }

    Ok(())
}

发送者和接收者类型很简单(简单类型一直往下):

use std::{collections::BTreeMap, fmt::Debug};



use serde::{Deserialize, Serialize};

#[derive(Default, Clone, Debug, Serialize, Deserialize)]
pub struct StatusMessage {
    pub name: String,
    pub entries: BTreeMap<i32, GuiEntry>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CommandMessage {
    pub sender: String,
    pub entryid: i32,
    pub command: GuiValue,
}

现在我想概括代码,以便我可以创建一个采用其他类型的 Sender 和 Receiver 类型的结构。是的,我可以只更改别名,但我希望能够使用通用类型参数而不是复制整个文件。问题是当我按照编译器的建议进行操作时,我最终陷入了一个不知道下一步该做什么的地方。它告诉我 resptext 的寿命不够长:

`resptxt` does not live long enough
borrowed value does not live long enoughrust cE0597
status_ws.rs(133, 29): `resptxt` dropped here while still borrowed
status_ws.rs(115, 28): lifetime `'a` defined here
status_ws.rs(129, 39): argument requires that `resptxt` is borrowed for `'a`

这是我目前所拥有的:

use async_once::AsyncOnce;
use common_wasm::models::status::{CommandMessage, StatusMessage};
use futures_util::{SinkExt, StreamExt};
use lazy_static::lazy_static;
use serde::{Serialize, Deserialize};
use std::{collections::VecDeque, net::SocketAddr};
use tokio::{
    net::{TcpListener, TcpStream}, sync::{broadcast, mpsc}
};
use tokio_tungstenite::{
    accept_async, tungstenite::{Error, Message, Result}
};
use tracing::*;

// 
lazy_static! {
    pub static ref STATUS_REPORTER: AsyncOnce<StatusWs<CommandMessage, StatusMessage>> = AsyncOnce::new(async {
        info!("Init lazy static WS");
        let server = StatusWs::init("ws://localhost:44444").await;
        server
    });
}

// use StatusMessage as SenderType;
// use CommandMessage as ReceiveType;

pub struct StatusWs<ReceiveType, SenderType> {
    buf: VecDeque<ReceiveType>,
    rx_client_msg: mpsc::Receiver<ReceiveType>,
    tx_server_msg: broadcast::Sender<SenderType>,
}

impl <'a, ReceiveType: Deserialize<'a> + Send, SenderType: Serialize + Clone + Send + Sync> StatusWs <ReceiveType, SenderType> {
    pub async fn init(addr: &str) -> StatusWs<ReceiveType, SenderType> {
        info!("Init Status WS on {}", addr);

        let listener = TcpListener::bind(&addr).await.expect("Can't listen");

        // Clients producting to server, they use the tx to send and server uses the rx to read
        let (tx_client_msg, rx_client_msg) = mpsc::channel::<ReceiveType>(32);

        // spmc for server to broadcast status to listeners. Server uses tx to send and client uses rx to read
        let (tx_server_msg, _rx_server_msg) = broadcast::channel::<SenderType>(10);

        let tx_server_2 = tx_server_msg.clone();
        tokio::spawn(async move {
            while let Ok((stream, peer)) = listener.accept().await {
                info!("Peer address connected: {}", peer);

                let tx_client = tx_client_msg.clone();
                let rx_server = tx_server_msg.subscribe();
                tokio::spawn(async move {
                    accept_connection(peer, stream, tx_client, rx_server).await;
                });
            }
        });

        StatusWs { buf: VecDeque::new(), rx_client_msg, tx_server_msg: tx_server_2 }
    }

    pub async fn reportinfo(&self, msg: &SenderType) {
        let my_msg = msg.clone();

        match &self.tx_server_msg.send(my_msg) {
            Ok(_size) => {
                //trace!("Server Sending OK {}", size)
            },
            Err(_err) => {
                //trace!("Server Sending ERR {:?}", err)
            },
        }
    }

    pub async fn next(&mut self) -> Result<Option<ReceiveType>> {
        loop {
            // If buffer contains data, we can directly return it.
            if let Some(data) = self.buf.pop_front() {
                return Ok(Some(data));
            }

            // Fetch new response if buffer is empty.
            let response = self.next_response().await?;

            // Handle the response, possibly adding to the buffer
            self.handle_response(response)?;
        }
    }

    async fn next_response(&mut self) -> Result<ReceiveType> {
        loop {
            tokio::select! { // TODO don't need select if there's only one thing?
                Some(msg) = self.rx_client_msg.recv() => {
                    return Ok(msg)
                },
            }
        }
    }

    fn handle_response(&mut self, response: ReceiveType) -> Result<()> {
        self.buf.push_back(response);
        Ok(())
    }
}

async fn accept_connection<'a, ReceiveType: Deserialize<'a>, SenderType: Clone + Serialize>(peer: SocketAddr, stream: TcpStream, tx_client: mpsc::Sender<ReceiveType>, rx_server: broadcast::Receiver<SenderType>) {
    info!("Accepting connection from {}", peer);
    if let Err(e) = handle_connection(peer, stream, tx_client, rx_server).await {
        match e {
            Error::ConnectionClosed | Error::Protocol(_) | Error::Utf8 => error!("Connection closed"),
            err => error!("Error processing connection: {}", err),
        }
    }
}

async fn handle_connection<'a, ReceiveType: Deserialize<'a>, SenderType: Clone + Serialize>(
    _peer: SocketAddr, stream: TcpStream, tx_client: mpsc::Sender<ReceiveType>, mut rx_server: broadcast::Receiver<SenderType>,
) -> Result<()> {
    let ws_stream = accept_async(stream).await.expect("Failed to accept");
    let (mut ws_sender, mut ws_receiver) = ws_stream.split();

    loop {
        tokio::select! {
            remote_msg = ws_receiver.next() => {
                match remote_msg {
                    Some(msg) => {
                        let msg = msg?;
                        match msg {
                            Message::Text(resptxt) => {
                                match serde_json::from_str::<ReceiveType>(&resptxt) {
                                    Ok(cmd) => { let _ = tx_client.send(cmd).await; },
                                    Err(err) => error!("Error deserializing: {}", err),
                                }
                            },
                            Message::Close(_) => break,
                            _ => { },
                        }
                    }
                    None => break,
                }
            }
            Ok(msg) = rx_server.recv() => {
                match serde_json::to_string(&msg) {
                    Ok(txt) => ws_sender.send(Message::Text(txt)).await?,
                    Err(_) => todo!(),
                }
            }
        }
    }

    Ok(())
}

我认为对于必要的生命周期和界限存在一些混淆,特别是来自 Serde 的反序列化器的生命周期和消息类型上的 Send/Sync 自动特征标记。

无论如何,复制整个原始文件并更改别名似乎有点蛮力,这肯定会起作用,因为这里似乎有一些有用的教训。

您应该使用 serde::de::DeserializeOwned 而不是 Deserialize<'a>

Deserialize 特性采用生命周期参数来支持零成本反序列化,但您无法利用它,因为源 resptxt 是一个瞬态值,它不是坚持到任何地方。 DeserializeOwned 特征可用于约束反序列化类型不保留对源的引用,因此可以在源之外使用。

修复后,您会收到 ReceiveTypeSenderType 必须 'static 才能在 tokio::spawn 任务中使用的错误。添加该约束最终使您​​的代码编译。

为简洁起见,请参阅 playground 上的完整编译代码。