如何将 "Message" 发送回 "Higher" 结构?

How can I send a "Message" back to a "Higher" Struct?

我目前有 2 个文件:Main.rsConnection.rs.

Connection.rs 目前包含 SendListenConnectTcpStream 的能力。

Connection.rs

use tokio::io::{AsyncReadExt, AsyncWriteExt, WriteHalf, ReadHalf};
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use std::sync::Arc;
use iced_futures::futures;

#[derive(Debug, Clone)]
pub enum ConnMessage {
    Code(usize, String),
    Chat(usize, String),
    View(usize, String),
    None,
}


#[derive(Debug, Clone)]
pub enum ConnectionError {
    ConnectError(usize),
    SendError(usize),
    ReadError(usize),
}

#[derive(Debug, Clone)]
pub struct Connection {
    pub write_stream: Arc<Mutex<WriteHalf<TcpStream>>>,
    pub read_stream: Arc<Mutex<ReadHalf<TcpStream>>>,
    pub loc: usize,
    pub code: String,
}

impl Connection {
    
    pub async fn connect(loc:usize) -> Result<Connection, ConnectionError> {
        
        let socket = TcpStream::connect("3.92.0.221:80").await.map_err(|_| ConnectionError::ConnectError(loc))?;
        
        let (rd, wr) = tokio::io::split(socket);
        
        let conn = Connection {
                    write_stream: Arc::new(Mutex::new(wr)),
                    read_stream: Arc::new(Mutex::new(rd)),
                    loc: loc,
                    code: String::from(""),
                };
        
        Ok( conn )
        
    }

    pub fn listen(conn: Connection) -> Result<(), ConnectionError> {
        tokio::spawn(async move {
            let mut message = String::from("");
                loop {
                        let mut buf = [0u8; 16];
                let mut rd = conn.read_stream.lock().await;
                        rd.read(&mut buf).await.unwrap();
                
                // ASSUMPTION - Disconnected when Array is all 0s, i.e. a set of bytes that contained nothing is sent
                let mut disconnected = true;
                for i in buf.iter() {
                    if i != &0u8 {
                        disconnected = false;
                    }
                }
                if disconnected {
                    println!("Disconnected");
                }
                else {
                    let string_result = std::str::from_utf8(&buf).map_err(|_| ConnectionError::ReadError(conn.loc));
                    if string_result.is_ok() {
                        let string_contents = string_result.unwrap();
                        println!("conn.loc: {} -- Contents: {}", conn.loc, string_contents);

                        message += string_contents;
                        
                        // End of Message - Parse and Reset
                        if message.contains("\.") {
                            println!("EOM");
                            message = message.replace("\.", "");
                            // Send `message` to Message inside Main.rs
                            message = String::from("");
                            println!("Resetting Msg");
                        }
                        else {
                            println!("Not end of message");
                        }
                    }
                    else {
                        println!("String Result Error");
                    }
                }
                }
            });
        
        Ok(())
    }

    pub async fn send(connection: Connection, string: String) -> Result<(), ConnectionError> {
        let mut stream = connection.write_stream.lock().await;
        stream.write_all(string.as_bytes()).await.map_err(|_| ConnectionError::SendError(connection.loc))?;
        //println!("Code: {}", connection.code);
        Ok( () )
    }
}

Main.rs 目前包含 Iced 的使用,包括我创建的用于在按下按钮时建立连接的 GUI。

use iced::{
    pane_grid, PaneGrid, Application, Settings, executor, Command, Container, Length, Element, 
    scrollable, button, Align, HorizontalAlignment, Column, Scrollable, Text, Button, Row,
    text_input, TextInput, 
};

use tokio::io::{AsyncReadExt, AsyncWriteExt, WriteHalf, ReadHalf};
//use tokio_util::codec::{BytesCodec, FramedRead, FramedWrite};
use tokio::net::TcpStream;
use tokio::sync::Mutex;
//use futures::prelude::*;
use std::sync::Arc;

mod Connection;

pub fn main() -> iced::Result {
    ClientAdminGUI::run(Settings::default())
}

struct ClientAdminGUI {
    sessions_pane: pane_grid::State<ClientAdminSessionPaneContent>,
    chat_pane: pane_grid::State<ClientAdminChatPaneContent>, 
    image_viewer_pane: pane_grid::State<ClientAdminViewerPaneContent>,
    connections: Vec<Connection::Connection>,
    cur_connection: Option<Connection::Connection>,
}

#[derive(Debug, Clone)]
enum Message {
    Session(Result<Connection::Connection, Connection::ConnectionError>), //Async Handler
    SendMessage,
    Send(Result<(), Connection::ConnectionError>), //Async Handler
    InputChanged(String),
    Button(usize, ButtonMessage),
    //None,
    UpdateCode(String),
    ReadConnMessage(Result<Connection::ConnMessage, ()>),
}

impl Application for ClientAdminGUI {
    type Message = Message;
    type Executor = executor::Default;
    type Flags = ();

    fn new(_flags: ()) -> (Self, Command<Message>) {
        let sessions_pane_content_value = ClientAdminSessionPaneContent::new();
        let (sessions_pane, _) = pane_grid::State::new(sessions_pane_content_value);
        let chat_pane_content_value = ClientAdminChatPaneContent::new();
        let (chat_pane, _) = pane_grid::State::new(chat_pane_content_value);
        let (image_viewer_pane, _) = pane_grid::State::new(ClientAdminViewerPaneContent::new());

        (
            ClientAdminGUI {
                sessions_pane,
                chat_pane,
                image_viewer_pane,
                connections: Vec::new(),
                cur_connection: None
            },
            Command::none(),
        )
    }

    fn title(&self) -> String {
        String::from("Client Admin GUI")
    }       

    fn update(&mut self, message: Message) -> Command<Message> {
        match message {
            Message::Session(Ok(result)) => {
                // result is a connection

                self.connections.push(result);

                // ...
                Connection::Connection::listen(Some(self.connections[self.connections.len()-1].clone()).unwrap());
            }
            ... //For all the rest of `Message`s 
        }
    }
}

在我的 listen 函数中,我从其中的一个连接得到回复。但是,我不完全确定如何将其反馈给应用程序以便能够对其执行一些操作。

问题:

如何将从 Connection.rs 中的 listen 函数获取的数据发送回我的 main.rs 作为 Message - 说说我的 Message::UpdateCode(String) 功能?

我只需要将 closure 作为参数传递给函数。从那里开始,我必须确保生命周期是正确的并且它有正确的参数。

您可以通过传递 generic 类型然后使用 where 设置泛型的确切类型来完成此操作。

Connection.rs

use tokio::io::{AsyncReadExt, AsyncWriteExt, WriteHalf, ReadHalf};
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use std::sync::Arc;
use iced_futures::futures;

#[derive(Debug, Clone)]
pub enum ConnMessage {
    Code(usize, String),
    Chat(usize, String),
    View(usize, String),
    None,
}


#[derive(Debug, Clone)]
pub enum ConnectionError {
    ConnectError(usize),
    SendError(usize),
    ReadError(usize),
}

#[derive(Debug, Clone)]
pub struct Connection {
    pub write_stream: Arc<Mutex<WriteHalf<TcpStream>>>,
    pub read_stream: Arc<Mutex<ReadHalf<TcpStream>>>,
    pub loc: usize,
    pub code: String,
}

impl Connection {
    
    pub async fn connect(loc:usize) -> Result<Connection, ConnectionError> {
        
        let socket = TcpStream::connect("3.92.0.221:80").await.map_err(|_| ConnectionError::ConnectError(loc))?;
        
        let (rd, wr) = tokio::io::split(socket);
        
        let conn = Connection {
                    write_stream: Arc::new(Mutex::new(wr)),
                    read_stream: Arc::new(Mutex::new(rd)),
                    loc: loc,
                    code: String::from(""),
                };
        
        Ok( conn )
        
    }

    pub fn listen<F>(conn: Connection, read_message: F) where F: Fn(String, usize) + 'static + std::marker::Send {
        tokio::spawn(async move {
            let mut message = String::from("");
                loop {
                        let mut buf = [0u8; 16];
                let mut rd = conn.read_stream.lock().await;
                        rd.read(&mut buf).await.unwrap();
                
                // ASSUMPTION - Disconnected when Array is all 0s, i.e. a set of bytes that contained nothing is sent
                let mut disconnected = true;
                for i in buf.iter() {
                    if i != &0u8 {
                        disconnected = false;
                    }
                }
                if disconnected {
                    println!("Disconnected");
                }
                else {
                    let string_result = std::str::from_utf8(&buf).map_err(|_| ConnectionError::ReadError(conn.loc));
                    if string_result.is_ok() {
                        let string_contents = string_result.unwrap();
                        println!("conn.loc: {} -- Contents: {}", conn.loc, string_contents);

                        message += string_contents;
                        
                        // End of Message - Parse and Reset
                        if message.contains("\.") {
                            println!("EOM");
                            message = message.replace("\.", "");
                            read_message(message, conn.loc);
                            message = String::from("");
                            println!("Resetting Msg");
                        }
                        else {
                            println!("Not end of message");
                        }
                    }
                    else {
                        println!("String Result Error");
                    }
                }
                }
            });
    }

    pub async fn send(connection: Connection, string: String) -> Result<(), ConnectionError> {
        let mut stream = connection.write_stream.lock().await;
        stream.write_all(string.as_bytes()).await.map_err(|_| ConnectionError::SendError(connection.loc))?;
        Ok( () )
    }
}

Main.rs(仅相关部分)

Connection::Connection::listen(Some(self.connections[self.connections.len()-1].clone()).unwrap(), 
                    (|string:String, loc:usize| {
                        println!("String is: {} -- loc: {}", string, loc);
                     }));