如何使用 tokio async TcpStream 将 bevy 游戏连接到外部 TCP 服务器?
How to connect bevy game to externel TCP server using tokios async TcpStream?
我想在游戏客户端和服务器之间发送事件,我已经让它工作了,但我不知道如何用 bevy 来做。
我依赖于使用 tokios async TcpStream
,因为我必须能够使用 stream.into_split()
将流拆分为 OwnedWriteHalf
和 OwnedReadhalf
。
我的第一个想法是生成一个处理连接的线程,然后使用 mpsc::channel
将接收到的事件发送到队列
然后我使用 app.insert_resource(Queue)
将这个队列包含到 bevy 资源中,并在游戏循环中从中提取事件。
队列:
use tokio::sync::mpsc;
pub enum Instruction {
Push(GameEvent),
Pull(mpsc::Sender<Option<GameEvent>>),
}
#[derive(Clone, Debug)]
pub struct Queue {
sender: mpsc::Sender<Instruction>,
}
impl Queue {
pub fn init() -> Self {
let (tx, rx) = mpsc::channel(1024);
init(rx);
Self{sender: tx}
}
pub async fn send(&self, event: GameEvent) {
self.sender.send(Instruction::Push(event)).await.unwrap();
}
pub async fn pull(&self) -> Option<GameEvent> {
println!("new pull");
let (tx, mut rx) = mpsc::channel(1);
self.sender.send(Instruction::Pull(tx)).await.unwrap();
rx.recv().await.unwrap()
}
}
fn init(mut rx: mpsc::Receiver<Instruction>) {
tokio::spawn(async move {
let mut queue: Vec<GameEvent> = Vec::new();
loop {
match rx.recv().await.unwrap() {
Instruction::Push(ev) => {
queue.push(ev);
}
Instruction::Pull(sender) => {
sender.send(queue.pop()).await.unwrap();
}
}
}
});
}
但是因为所有这些都必须是异步的,所以我在同步游戏循环中屏蔽了 pull()
函数。
我使用 futures-lite
crate:
fn event_pull(
communication: Res<Communication>
) {
let ev = future::block_on(communication.event_queue.pull());
println!("got event: {:?}", ev);
}
这工作正常,但是大约 5 秒后整个程序停止并且不再接收任何事件。
似乎 future::block_on()
会无限期阻塞。
拥有主函数,其中构建了 bevy::prelude::App
和 运行,作为异步 tokio::main
函数在这里也可能是个问题。
最好将异步 TcpStream
初始化和 tokio::sync::mpsc::Sender
以及 Queue.pull
包装到同步函数中,但我不知道该怎么做。
有人可以帮忙吗?
如何重现
可以找到 repo here
只需编译 server
和 client
,然后按相同的顺序编译 运行。
我通过将每个 tokio::sync::mpsc
替换为 crossbeam::channel
来让它工作,这可能是个问题,因为它会阻塞
并手动初始化 tokio 运行时。
因此初始化代码如下所示:
pub struct Communicator {
pub event_bridge: bridge::Bridge,
pub event_queue: event_queue::Queue,
_runtime: Runtime,
}
impl Communicator {
pub fn init(ip: &str) -> Self {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_io()
.build()
.unwrap();
let (bridge, queue, game_rx) = rt.block_on(async move {
let socket = TcpStream::connect(ip).await.unwrap();
let (read, write) = socket.into_split();
let reader = TcpReader::new(read);
let writer = TcpWriter::new(write);
let (bridge, tcp_rx, game_rx) = bridge::Bridge::init();
reader::init(bridge.clone(), reader);
writer::init(tcp_rx, writer);
let event_queue = event_queue::Queue::init();
return (bridge, event_queue, game_rx);
});
// game of game_rx events to queue for game loop
let eq_clone = queue.clone();
rt.spawn(async move {
loop {
let event = game_rx.recv().unwrap();
eq_clone.send(event);
}
});
Self {
event_bridge: bridge,
event_queue: queue,
_runtime: rt,
}
}
}
而 main.rs
看起来像这样:
fn main() {
let communicator = communication::Communicator::init("0.0.0.0:8000");
communicator.event_bridge.push_tcp(TcpEvent::Register{name: String::from("luca")});
App::new()
.insert_resource(communicator)
.add_system(event_pull)
.add_plugins(DefaultPlugins)
.run();
}
fn event_pull(
communication: Res<communication::Communicator>
) {
let ev = communication.event_queue.pull();
if let Some(ev) = ev {
println!("ev");
}
}
也许会有更好的解决方案。
我想在游戏客户端和服务器之间发送事件,我已经让它工作了,但我不知道如何用 bevy 来做。
我依赖于使用 tokios async TcpStream
,因为我必须能够使用 stream.into_split()
将流拆分为 OwnedWriteHalf
和 OwnedReadhalf
。
我的第一个想法是生成一个处理连接的线程,然后使用 mpsc::channel
然后我使用 app.insert_resource(Queue)
将这个队列包含到 bevy 资源中,并在游戏循环中从中提取事件。
队列:
use tokio::sync::mpsc;
pub enum Instruction {
Push(GameEvent),
Pull(mpsc::Sender<Option<GameEvent>>),
}
#[derive(Clone, Debug)]
pub struct Queue {
sender: mpsc::Sender<Instruction>,
}
impl Queue {
pub fn init() -> Self {
let (tx, rx) = mpsc::channel(1024);
init(rx);
Self{sender: tx}
}
pub async fn send(&self, event: GameEvent) {
self.sender.send(Instruction::Push(event)).await.unwrap();
}
pub async fn pull(&self) -> Option<GameEvent> {
println!("new pull");
let (tx, mut rx) = mpsc::channel(1);
self.sender.send(Instruction::Pull(tx)).await.unwrap();
rx.recv().await.unwrap()
}
}
fn init(mut rx: mpsc::Receiver<Instruction>) {
tokio::spawn(async move {
let mut queue: Vec<GameEvent> = Vec::new();
loop {
match rx.recv().await.unwrap() {
Instruction::Push(ev) => {
queue.push(ev);
}
Instruction::Pull(sender) => {
sender.send(queue.pop()).await.unwrap();
}
}
}
});
}
但是因为所有这些都必须是异步的,所以我在同步游戏循环中屏蔽了 pull()
函数。
我使用 futures-lite
crate:
fn event_pull(
communication: Res<Communication>
) {
let ev = future::block_on(communication.event_queue.pull());
println!("got event: {:?}", ev);
}
这工作正常,但是大约 5 秒后整个程序停止并且不再接收任何事件。
似乎 future::block_on()
会无限期阻塞。
拥有主函数,其中构建了 bevy::prelude::App
和 运行,作为异步 tokio::main
函数在这里也可能是个问题。
最好将异步 TcpStream
初始化和 tokio::sync::mpsc::Sender
以及 Queue.pull
包装到同步函数中,但我不知道该怎么做。
有人可以帮忙吗?
如何重现
可以找到 repo here
只需编译 server
和 client
,然后按相同的顺序编译 运行。
我通过将每个 tokio::sync::mpsc
替换为 crossbeam::channel
来让它工作,这可能是个问题,因为它会阻塞
并手动初始化 tokio 运行时。
因此初始化代码如下所示:
pub struct Communicator {
pub event_bridge: bridge::Bridge,
pub event_queue: event_queue::Queue,
_runtime: Runtime,
}
impl Communicator {
pub fn init(ip: &str) -> Self {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_io()
.build()
.unwrap();
let (bridge, queue, game_rx) = rt.block_on(async move {
let socket = TcpStream::connect(ip).await.unwrap();
let (read, write) = socket.into_split();
let reader = TcpReader::new(read);
let writer = TcpWriter::new(write);
let (bridge, tcp_rx, game_rx) = bridge::Bridge::init();
reader::init(bridge.clone(), reader);
writer::init(tcp_rx, writer);
let event_queue = event_queue::Queue::init();
return (bridge, event_queue, game_rx);
});
// game of game_rx events to queue for game loop
let eq_clone = queue.clone();
rt.spawn(async move {
loop {
let event = game_rx.recv().unwrap();
eq_clone.send(event);
}
});
Self {
event_bridge: bridge,
event_queue: queue,
_runtime: rt,
}
}
}
而 main.rs
看起来像这样:
fn main() {
let communicator = communication::Communicator::init("0.0.0.0:8000");
communicator.event_bridge.push_tcp(TcpEvent::Register{name: String::from("luca")});
App::new()
.insert_resource(communicator)
.add_system(event_pull)
.add_plugins(DefaultPlugins)
.run();
}
fn event_pull(
communication: Res<communication::Communicator>
) {
let ev = communication.event_queue.pull();
if let Some(ev) = ev {
println!("ev");
}
}
也许会有更好的解决方案。