如何使用 tokio async TcpStream 将 bevy 游戏连接到外部 TCP 服务器?

How to connect bevy game to externel TCP server using tokios async TcpStream?

我想在游戏客户端和服务器之间发送事件,我已经让它工作了,但我不知道如何用 bevy 来做。

我依赖于使用 tokios async TcpStream,因为我必须能够使用 stream.into_split() 将流拆分为 OwnedWriteHalfOwnedReadhalf

我的第一个想法是生成一个处理连接的线程,然后使用 mpsc::channel

将接收到的事件发送到队列

然后我使用 app.insert_resource(Queue) 将这个队列包含到 bevy 资源中,并在游戏循环中从中提取事件。

队列:

use tokio::sync::mpsc;

pub enum Instruction {
    Push(GameEvent),
    Pull(mpsc::Sender<Option<GameEvent>>),
}

#[derive(Clone, Debug)]
pub struct Queue {
    sender: mpsc::Sender<Instruction>,
}
impl Queue {
    pub fn init() -> Self {
        let (tx, rx) = mpsc::channel(1024);
        init(rx);
        Self{sender: tx}
    }
    pub async fn send(&self, event: GameEvent) {
        self.sender.send(Instruction::Push(event)).await.unwrap();
    }
    pub async fn pull(&self) -> Option<GameEvent> {
        println!("new pull");
        let (tx, mut rx) = mpsc::channel(1);
        self.sender.send(Instruction::Pull(tx)).await.unwrap();
        rx.recv().await.unwrap()
    }
}

fn init(mut rx: mpsc::Receiver<Instruction>) {
    tokio::spawn(async move {
        let mut queue: Vec<GameEvent> = Vec::new();

        loop {
            match rx.recv().await.unwrap() {
                Instruction::Push(ev) => {
                    queue.push(ev);
                }
                Instruction::Pull(sender) => {
                    sender.send(queue.pop()).await.unwrap();
                }
            }
        }
    });
}

但是因为所有这些都必须是异步的,所以我在同步游戏循环中屏蔽了 pull() 函数。 我使用 futures-lite crate:

fn event_pull(
    communication: Res<Communication>
) {
    let ev = future::block_on(communication.event_queue.pull());
    println!("got event: {:?}", ev);
}

这工作正常,但是大约 5 秒后整个程序停止并且不再接收任何事件。

似乎 future::block_on() 会无限期阻塞。

拥有主函数,其中构建了 bevy::prelude::App 和 运行,作为异步 tokio::main 函数在这里也可能是个问题。

最好将异步 TcpStream 初始化和 tokio::sync::mpsc::Sender 以及 Queue.pull 包装到同步函数中,但我不知道该怎么做。

有人可以帮忙吗?

如何重现

可以找到 repo here

只需编译 serverclient,然后按相同的顺序编译 运行。

我通过将每个 tokio::sync::mpsc 替换为 crossbeam::channel 来让它工作,这可能是个问题,因为它会阻塞

并手动初始化 tokio 运行时。

因此初始化代码如下所示:

pub struct Communicator {
    pub event_bridge: bridge::Bridge,
    pub event_queue: event_queue::Queue,
    _runtime: Runtime,
}
impl Communicator {
    pub fn init(ip: &str) -> Self {
        let rt = tokio::runtime::Builder::new_multi_thread()
            .enable_io()
            .build()
            .unwrap();
    
        let (bridge, queue, game_rx) = rt.block_on(async move {
            let socket = TcpStream::connect(ip).await.unwrap();
            let (read, write) = socket.into_split();
            let reader = TcpReader::new(read);
            let writer = TcpWriter::new(write);
        
            let (bridge, tcp_rx, game_rx) = bridge::Bridge::init(); 
            reader::init(bridge.clone(), reader);
            writer::init(tcp_rx, writer);
        
            let event_queue = event_queue::Queue::init();
        
            return (bridge, event_queue, game_rx);
        });
    
        // game of game_rx events to queue for game loop
        let eq_clone = queue.clone();
        rt.spawn(async move {
            loop {
                let event = game_rx.recv().unwrap(); 
                eq_clone.send(event);
            }
        });
    
        Self {
            event_bridge: bridge,
            event_queue: queue,
            _runtime: rt,
        }
    }
}

main.rs 看起来像这样:

fn main() {
    let communicator = communication::Communicator::init("0.0.0.0:8000");

    communicator.event_bridge.push_tcp(TcpEvent::Register{name: String::from("luca")});

    App::new()
        .insert_resource(communicator)
        .add_system(event_pull)
        .add_plugins(DefaultPlugins)
        .run();
}

fn event_pull(
    communication: Res<communication::Communicator>
) {
    let ev = communication.event_queue.pull();
    if let Some(ev) = ev {
        println!("ev");
    }
}

也许会有更好的解决方案。