使用 Tokio futures 的多播 UDP 数据包
Multicast UDP packets using Tokio futures
我正在研究 Tokio 和 Rust,举个例子,我正在尝试编写一个简单的 UDP 代理,它只接受一个套接字上的 UDP 数据包并将其发送到多个其他目的地。但是,我偶然发现需要将接收到的数据包发送到多个地址的情况,但我不确定如何以惯用的方式做到这一点。
到目前为止我的代码:
extern crate bytes;
extern crate futures;
use std::net::SocketAddr;
use tokio::codec::BytesCodec;
use tokio::net::{UdpFramed, UdpSocket};
use tokio::prelude::*;
fn main() {
let listen_address = "127.0.0.1:4711".parse::<SocketAddr>().unwrap();
let forwarder = {
let socket = UdpSocket::bind(&listen_address).unwrap();
let peers = vec![
"192.168.1.136:4711".parse::<SocketAddr>().unwrap(),
"192.168.1.136:4712".parse::<SocketAddr>().unwrap(),
];
UdpFramed::new(UdpSocket::bind(&listen_address).unwrap(), BytesCodec::new()).for_each(
move |(bytes, _from)| {
// These are the problematic lines
for peer in peers.iter() {
socket.send_dgram(&bytes, &peer);
}
Ok(())
},
)
};
tokio::run({
forwarder
.map_err(|err| println!("Error: {}", err))
.map(|_| ())
});
}
有问题的线路正在尝试使用新绑定的套接字将接收到的数据包发送到多个其他地址。
现有示例都将数据包转发到单个目的地,或者在内部使用 mpsc 通道在内部任务之间进行通信。我不认为这是必要的,并且应该可以做到而不必为每个侦听套接字生成多个任务。
更新: 感谢@Ömer-erden,我得到了这段有效的代码。
extern crate bytes;
extern crate futures;
use std::net::SocketAddr;
use tokio::codec::BytesCodec;
use tokio::net::{UdpFramed, UdpSocket};
use tokio::prelude::*;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let listen_address = "0.0.0.0:4711".parse::<SocketAddr>()?;
let socket = UdpSocket::bind(&listen_address)?;
let peers: Vec<SocketAddr> = vec!["192.168.1.136:8080".parse()?, "192.168.1.136:8081".parse()?];
let (mut writer, reader) = UdpFramed::new(socket, BytesCodec::new()).split();
let forwarder = reader.for_each(move |(bytes, _from)| {
for peer in peers.iter() {
writer.start_send((bytes.clone().into(), peer.clone()))?;
}
writer.poll_complete()?;
Ok(())
});
tokio::run({
forwarder
.map_err(|err| println!("Error: {}", err))
.map(|_| ())
});
Ok(())
}
注意:
不需要为每个start_send
都调用poll_completion
:只需要在所有start_send
被调度后调用即可。
出于某种原因,peer
的内容在调用之间被删除(但没有编译器错误),生成错误 22(这通常是因为给出了错误的地址至 sendto(2)
).
查看调试器,很明显第二次,对等地址指向无效内存。我选择克隆 peer
。
我删除了对 unwrap()
的调用并改为向上传播 Result
。
你的代码有一个逻辑错误:你试图绑定同一个地址两次,分别作为发送者和接收者。相反,您可以使用 流和接收器 。 UdpFramed
具有提供该功能的功能,请参阅 Sink
:
A Sink
is a value into which other values can be sent, asynchronously.
let listen_address = "127.0.0.1:4711".parse::<SocketAddr>().unwrap();
let forwarder = {
let (mut socket_sink, socket_stream) =
UdpFramed::new(UdpSocket::bind(&listen_address).unwrap(), BytesCodec::new()).split();
let peers = vec![
"192.168.1.136:4711".parse::<SocketAddr>().unwrap(),
"192.168.1.136:4712".parse::<SocketAddr>().unwrap(),
];
socket_stream.for_each(move |(bytes, _from)| {
for peer in peers.iter() {
socket_sink.start_send((bytes.clone().into(), *peer));
socket_sink.poll_complete();
}
Ok(())
})
};
tokio::run({
forwarder
.map_err(|err| println!("Error: {}", err))
.map(|_| ())
});
我正在研究 Tokio 和 Rust,举个例子,我正在尝试编写一个简单的 UDP 代理,它只接受一个套接字上的 UDP 数据包并将其发送到多个其他目的地。但是,我偶然发现需要将接收到的数据包发送到多个地址的情况,但我不确定如何以惯用的方式做到这一点。
到目前为止我的代码:
extern crate bytes;
extern crate futures;
use std::net::SocketAddr;
use tokio::codec::BytesCodec;
use tokio::net::{UdpFramed, UdpSocket};
use tokio::prelude::*;
fn main() {
let listen_address = "127.0.0.1:4711".parse::<SocketAddr>().unwrap();
let forwarder = {
let socket = UdpSocket::bind(&listen_address).unwrap();
let peers = vec![
"192.168.1.136:4711".parse::<SocketAddr>().unwrap(),
"192.168.1.136:4712".parse::<SocketAddr>().unwrap(),
];
UdpFramed::new(UdpSocket::bind(&listen_address).unwrap(), BytesCodec::new()).for_each(
move |(bytes, _from)| {
// These are the problematic lines
for peer in peers.iter() {
socket.send_dgram(&bytes, &peer);
}
Ok(())
},
)
};
tokio::run({
forwarder
.map_err(|err| println!("Error: {}", err))
.map(|_| ())
});
}
有问题的线路正在尝试使用新绑定的套接字将接收到的数据包发送到多个其他地址。
现有示例都将数据包转发到单个目的地,或者在内部使用 mpsc 通道在内部任务之间进行通信。我不认为这是必要的,并且应该可以做到而不必为每个侦听套接字生成多个任务。
更新: 感谢@Ömer-erden,我得到了这段有效的代码。
extern crate bytes;
extern crate futures;
use std::net::SocketAddr;
use tokio::codec::BytesCodec;
use tokio::net::{UdpFramed, UdpSocket};
use tokio::prelude::*;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let listen_address = "0.0.0.0:4711".parse::<SocketAddr>()?;
let socket = UdpSocket::bind(&listen_address)?;
let peers: Vec<SocketAddr> = vec!["192.168.1.136:8080".parse()?, "192.168.1.136:8081".parse()?];
let (mut writer, reader) = UdpFramed::new(socket, BytesCodec::new()).split();
let forwarder = reader.for_each(move |(bytes, _from)| {
for peer in peers.iter() {
writer.start_send((bytes.clone().into(), peer.clone()))?;
}
writer.poll_complete()?;
Ok(())
});
tokio::run({
forwarder
.map_err(|err| println!("Error: {}", err))
.map(|_| ())
});
Ok(())
}
注意:
不需要为每个
start_send
都调用poll_completion
:只需要在所有start_send
被调度后调用即可。出于某种原因,
peer
的内容在调用之间被删除(但没有编译器错误),生成错误 22(这通常是因为给出了错误的地址至sendto(2)
).查看调试器,很明显第二次,对等地址指向无效内存。我选择克隆
peer
。我删除了对
unwrap()
的调用并改为向上传播Result
。
你的代码有一个逻辑错误:你试图绑定同一个地址两次,分别作为发送者和接收者。相反,您可以使用 流和接收器 。 UdpFramed
具有提供该功能的功能,请参阅 Sink
:
A
Sink
is a value into which other values can be sent, asynchronously.
let listen_address = "127.0.0.1:4711".parse::<SocketAddr>().unwrap();
let forwarder = {
let (mut socket_sink, socket_stream) =
UdpFramed::new(UdpSocket::bind(&listen_address).unwrap(), BytesCodec::new()).split();
let peers = vec![
"192.168.1.136:4711".parse::<SocketAddr>().unwrap(),
"192.168.1.136:4712".parse::<SocketAddr>().unwrap(),
];
socket_stream.for_each(move |(bytes, _from)| {
for peer in peers.iter() {
socket_sink.start_send((bytes.clone().into(), *peer));
socket_sink.poll_complete();
}
Ok(())
})
};
tokio::run({
forwarder
.map_err(|err| println!("Error: {}", err))
.map(|_| ())
});