如何创建 Tokio 计时器来消除网络数据包接收的抖动?
How can I create a Tokio timer to debounce reception of network packets?
问题
我已经实现了一个函数,该函数 return 发送的数据包中缺少索引的数组。如果发送 100 个数据包,则服务器将有一个索引向量,其中包含 0(缺失)和 1(未缺失)。我不想每次都触发这个,只有在没有收到数据包的情况下有轻微的延迟时才触发。我想将我的同步函数更改为 asynchronous debouncing function
我尝试解决去抖问题
我正在寻找一个解决方案来实现一个计时器(比如 300 毫秒),它的值会不断被不同的线程覆盖。一旦它的值不再被覆盖,它应该触发一个代码块或函数。我正在使用 Tokio。
这是我想要实现的伪代码:
// thanks
fn get_epoch() -> u128 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
}
impl Server {
async fn run(self) -> Result<(), io::Error> {
let Server {
socket,
mut buf,
mut to_send,
} = self;
let mut timer_delay = get_epoch();
loop {
if let Some((size, peer)) = to_send {
timer_delay = get_epoch(); // "reset" the to a closer value
}
futures::join!(
/* execute a block of code if true*/
if get_epoch() - timer_delay > 300,
/* else (default case):*/
to_send = Some(socket.recv_from(&mut buf)
);
}
}
}
我的项目基于 Tokio 中的以下示例:
impl Server {
async fn run(self) -> Result<(), io::Error> {
let Server {
socket,
mut buf,
mut to_send,
} = self;
loop {
// First we check to see if there's a message we need to echo back.
// If so then we try to send it back to the original source, waiting
// until it's writable and we're able to do so.
if let Some((size, peer)) = to_send {
let amt = socket.send_to(&buf[..size], &peer).await?;
println!("Echoed {}/{} bytes to {}", amt, size, peer);
}
// If we're here then `to_send` is `None`, so we take a look for the
// next message we're going to echo back.
to_send = Some(socket.recv_from(&mut buf).await?);
}
}
}
生成另一个用于去抖动的 Tokio 任务,它将监听一个频道。您可以通过使用超时来判断通道何时没有收到任何东西。当超时发生时,这就是您应该执行不常操作的信号。不要忘记在频道关闭时执行该操作:
use std::time::Duration;
use tokio::{sync::mpsc, task, time}; // 1.3.0
#[tokio::main]
async fn main() {
let (debounce_tx, mut debounce_rx) = mpsc::channel(10);
let (network_tx, mut network_rx) = mpsc::channel(10);
// Listen for events
let debouncer = task::spawn(async move {
let duration = Duration::from_millis(10);
loop {
match time::timeout(duration, debounce_rx.recv()).await {
Ok(Some(())) => {
eprintln!("Network activity")
}
Ok(None) => {
eprintln!("Debounce finished");
break;
}
Err(_) => {
eprintln!("{:?} since network activity", duration)
}
}
}
});
// Listen for network activity
let server = task::spawn({
let debounce_tx = debounce_tx.clone();
async move {
while let Some(packet) = network_rx.recv().await {
// Received a packet
debounce_tx
.send(())
.await
.expect("Unable to talk to debounce");
eprintln!("Received a packet: {:?}", packet);
}
}
});
// Prevent deadlocks
drop(debounce_tx);
// Drive the network input
network_tx.send(1).await.expect("Unable to talk to network");
network_tx.send(2).await.expect("Unable to talk to network");
network_tx.send(3).await.expect("Unable to talk to network");
time::sleep(Duration::from_millis(20)).await;
network_tx.send(4).await.expect("Unable to talk to network");
network_tx.send(5).await.expect("Unable to talk to network");
network_tx.send(6).await.expect("Unable to talk to network");
time::sleep(Duration::from_millis(20)).await;
// Close the network
drop(network_tx);
// Wait for everything to finish
server.await.expect("Server panicked");
debouncer.await.expect("Debouncer panicked");
}
Received a packet: 1
Received a packet: 2
Received a packet: 3
Network activity
Network activity
Network activity
10ms since network activity
10ms since network activity
Received a packet: 4
Received a packet: 5
Received a packet: 6
Network activity
Network activity
Network activity
10ms since network activity
Debounce finished
问题
我已经实现了一个函数,该函数 return 发送的数据包中缺少索引的数组。如果发送 100 个数据包,则服务器将有一个索引向量,其中包含 0(缺失)和 1(未缺失)。我不想每次都触发这个,只有在没有收到数据包的情况下有轻微的延迟时才触发。我想将我的同步函数更改为 asynchronous debouncing function
我尝试解决去抖问题
我正在寻找一个解决方案来实现一个计时器(比如 300 毫秒),它的值会不断被不同的线程覆盖。一旦它的值不再被覆盖,它应该触发一个代码块或函数。我正在使用 Tokio。
这是我想要实现的伪代码:
// thanks
fn get_epoch() -> u128 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
}
impl Server {
async fn run(self) -> Result<(), io::Error> {
let Server {
socket,
mut buf,
mut to_send,
} = self;
let mut timer_delay = get_epoch();
loop {
if let Some((size, peer)) = to_send {
timer_delay = get_epoch(); // "reset" the to a closer value
}
futures::join!(
/* execute a block of code if true*/
if get_epoch() - timer_delay > 300,
/* else (default case):*/
to_send = Some(socket.recv_from(&mut buf)
);
}
}
}
我的项目基于 Tokio 中的以下示例:
impl Server {
async fn run(self) -> Result<(), io::Error> {
let Server {
socket,
mut buf,
mut to_send,
} = self;
loop {
// First we check to see if there's a message we need to echo back.
// If so then we try to send it back to the original source, waiting
// until it's writable and we're able to do so.
if let Some((size, peer)) = to_send {
let amt = socket.send_to(&buf[..size], &peer).await?;
println!("Echoed {}/{} bytes to {}", amt, size, peer);
}
// If we're here then `to_send` is `None`, so we take a look for the
// next message we're going to echo back.
to_send = Some(socket.recv_from(&mut buf).await?);
}
}
}
生成另一个用于去抖动的 Tokio 任务,它将监听一个频道。您可以通过使用超时来判断通道何时没有收到任何东西。当超时发生时,这就是您应该执行不常操作的信号。不要忘记在频道关闭时执行该操作:
use std::time::Duration;
use tokio::{sync::mpsc, task, time}; // 1.3.0
#[tokio::main]
async fn main() {
let (debounce_tx, mut debounce_rx) = mpsc::channel(10);
let (network_tx, mut network_rx) = mpsc::channel(10);
// Listen for events
let debouncer = task::spawn(async move {
let duration = Duration::from_millis(10);
loop {
match time::timeout(duration, debounce_rx.recv()).await {
Ok(Some(())) => {
eprintln!("Network activity")
}
Ok(None) => {
eprintln!("Debounce finished");
break;
}
Err(_) => {
eprintln!("{:?} since network activity", duration)
}
}
}
});
// Listen for network activity
let server = task::spawn({
let debounce_tx = debounce_tx.clone();
async move {
while let Some(packet) = network_rx.recv().await {
// Received a packet
debounce_tx
.send(())
.await
.expect("Unable to talk to debounce");
eprintln!("Received a packet: {:?}", packet);
}
}
});
// Prevent deadlocks
drop(debounce_tx);
// Drive the network input
network_tx.send(1).await.expect("Unable to talk to network");
network_tx.send(2).await.expect("Unable to talk to network");
network_tx.send(3).await.expect("Unable to talk to network");
time::sleep(Duration::from_millis(20)).await;
network_tx.send(4).await.expect("Unable to talk to network");
network_tx.send(5).await.expect("Unable to talk to network");
network_tx.send(6).await.expect("Unable to talk to network");
time::sleep(Duration::from_millis(20)).await;
// Close the network
drop(network_tx);
// Wait for everything to finish
server.await.expect("Server panicked");
debouncer.await.expect("Debouncer panicked");
}
Received a packet: 1
Received a packet: 2
Received a packet: 3
Network activity
Network activity
Network activity
10ms since network activity
10ms since network activity
Received a packet: 4
Received a packet: 5
Received a packet: 6
Network activity
Network activity
Network activity
10ms since network activity
Debounce finished