在无限循环中异步重新连接客户端到服务器
Asynchronously reconnecting a client to a server in an infinite loop
我无法创建尝试连接到服务器的客户端并且:
- 如果服务器宕机,它必须在无限循环中重试
- 如果服务器已启动且连接成功,当连接丢失时(即服务器断开客户端),客户端必须重新启动无限循环以尝试连接到服务器
这是连接服务器的代码;目前,当连接丢失时,程序退出。我不确定实现它的最佳方法是什么;也许我必须创建一个带有无限循环的 Future
?
extern crate tokio_line;
use tokio_line::LineCodec;
fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {
let remote_addr = "127.0.0.1:9876".parse().unwrap();
let tcp = TcpStream::connect(&remote_addr, handle);
let client = tcp.and_then(|stream| {
let (sink, from_server) = stream.framed(LineCodec).split();
let reader = from_server.for_each(|message| {
println!("{}", message);
Ok(())
});
reader.map(|_| {
println!("CLIENT DISCONNECTED");
()
}).map_err(|err| err)
});
let client = client.map_err(|_| { panic!()});
Box::new(client)
}
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = get_connection(&handle);
let client = client.and_then(|c| {
println!("Try to reconnect");
get_connection(&handle);
Ok(())
});
core.run(client).unwrap();
}
添加 tokio-line crate:
tokio-line = { git = "https://github.com/tokio-rs/tokio-line" }
关键问题似乎是:如何使用 Tokio 实现无限循环?通过回答这个问题,我们可以解决断开连接后无限重连的问题。 根据我编写异步代码的经验,递归似乎是解决此问题的直接方法。
UPDATE:正如 Shepmaster(以及 Tokio Gitter 的成员)所指出的,我最初的答案会泄漏内存,因为我们构建了一条在每次迭代中增长的期货链。下面是一个新的:
更新后的答案:使用 loop_fn
futures
crate 中有一个函数可以完全满足您的需要。它被称为loop_fn
。您可以通过将主要功能更改为以下内容来使用它:
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = future::loop_fn((), |_| {
// Run the get_connection function and loop again regardless of its result
get_connection(&handle).map(|_| -> Loop<(), ()> {
Loop::Continue(())
})
});
core.run(client).unwrap();
}
该函数类似于一个 for 循环,它可以根据 get_connection
的结果继续或中断(请参阅 Loop
枚举的文档)。在这种情况下,我们选择一直继续,所以会无限重连。
请注意,如果出现错误(例如,如果客户端无法连接到服务器),您的 get_connection
版本将会崩溃。如果您还想在出错后重试,则应删除对 panic!
.
的调用
旧答案:使用递归
以下是我的旧答案,以防有人觉得有趣。
警告:使用下面的代码会导致内存无限增长。
使get_connection
无限循环
我们想在每次客户端断开连接时调用get_connection
函数,所以这正是我们要做的(看reader.and_then
后面的注释):
fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {
let remote_addr = "127.0.0.1:9876".parse().unwrap();
let tcp = TcpStream::connect(&remote_addr, handle);
let handle_clone = handle.clone();
let client = tcp.and_then(|stream| {
let (sink, from_server) = stream.framed(LineCodec).split();
let reader = from_server.for_each(|message| {
println!("{}", message);
Ok(())
});
reader.and_then(move |_| {
println!("CLIENT DISCONNECTED");
// Attempt to reconnect in the future
get_connection(&handle_clone)
})
});
let client = client.map_err(|_| { panic!()});
Box::new(client)
}
记住 get_connection
是 non-blocking。它只是构造了一个Box<Future>
。这意味着当递归调用它时,我们仍然不会阻塞。相反,我们得到了一个新的未来,我们可以通过使用 and_then
link 到前一个。如您所见,这与普通递归不同,因为堆栈不会在每次迭代时增长。
请注意,我们需要克隆 handle
(参见 handle_clone
),并将其移动到传递给 reader.and_then
的闭包中。这是必要的,因为闭包的寿命比函数长(它将包含在我们返回的未来)。
处理错误
您提供的代码无法处理客户端无法连接到服务器的情况(也无法处理任何其他错误)。按照上面显示的相同原则,我们可以通过将 get_connection
的结尾更改为以下内容来处理错误:
let handle_clone = handle.clone();
let client = client.or_else(move |err| {
// Note: this code will infinitely retry, but you could pattern match on the error
// to retry only on certain kinds of error
println!("Error connecting to server: {}", err);
get_connection(&handle_clone)
});
Box::new(client)
注意 or_else
类似于 and_then
,但它对未来产生的错误进行操作。
正在从 main
中删除不必要的代码
最后,不用在main
函数中使用and_then
。您可以用以下代码替换您的main
:
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = get_connection(&handle);
core.run(client).unwrap();
}
我无法创建尝试连接到服务器的客户端并且:
- 如果服务器宕机,它必须在无限循环中重试
- 如果服务器已启动且连接成功,当连接丢失时(即服务器断开客户端),客户端必须重新启动无限循环以尝试连接到服务器
这是连接服务器的代码;目前,当连接丢失时,程序退出。我不确定实现它的最佳方法是什么;也许我必须创建一个带有无限循环的 Future
?
extern crate tokio_line;
use tokio_line::LineCodec;
fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {
let remote_addr = "127.0.0.1:9876".parse().unwrap();
let tcp = TcpStream::connect(&remote_addr, handle);
let client = tcp.and_then(|stream| {
let (sink, from_server) = stream.framed(LineCodec).split();
let reader = from_server.for_each(|message| {
println!("{}", message);
Ok(())
});
reader.map(|_| {
println!("CLIENT DISCONNECTED");
()
}).map_err(|err| err)
});
let client = client.map_err(|_| { panic!()});
Box::new(client)
}
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = get_connection(&handle);
let client = client.and_then(|c| {
println!("Try to reconnect");
get_connection(&handle);
Ok(())
});
core.run(client).unwrap();
}
添加 tokio-line crate:
tokio-line = { git = "https://github.com/tokio-rs/tokio-line" }
关键问题似乎是:如何使用 Tokio 实现无限循环?通过回答这个问题,我们可以解决断开连接后无限重连的问题。 根据我编写异步代码的经验,递归似乎是解决此问题的直接方法。
UPDATE:正如 Shepmaster(以及 Tokio Gitter 的成员)所指出的,我最初的答案会泄漏内存,因为我们构建了一条在每次迭代中增长的期货链。下面是一个新的:
更新后的答案:使用 loop_fn
futures
crate 中有一个函数可以完全满足您的需要。它被称为loop_fn
。您可以通过将主要功能更改为以下内容来使用它:
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = future::loop_fn((), |_| {
// Run the get_connection function and loop again regardless of its result
get_connection(&handle).map(|_| -> Loop<(), ()> {
Loop::Continue(())
})
});
core.run(client).unwrap();
}
该函数类似于一个 for 循环,它可以根据 get_connection
的结果继续或中断(请参阅 Loop
枚举的文档)。在这种情况下,我们选择一直继续,所以会无限重连。
请注意,如果出现错误(例如,如果客户端无法连接到服务器),您的 get_connection
版本将会崩溃。如果您还想在出错后重试,则应删除对 panic!
.
旧答案:使用递归
以下是我的旧答案,以防有人觉得有趣。
警告:使用下面的代码会导致内存无限增长。
使get_connection
无限循环
我们想在每次客户端断开连接时调用get_connection
函数,所以这正是我们要做的(看reader.and_then
后面的注释):
fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {
let remote_addr = "127.0.0.1:9876".parse().unwrap();
let tcp = TcpStream::connect(&remote_addr, handle);
let handle_clone = handle.clone();
let client = tcp.and_then(|stream| {
let (sink, from_server) = stream.framed(LineCodec).split();
let reader = from_server.for_each(|message| {
println!("{}", message);
Ok(())
});
reader.and_then(move |_| {
println!("CLIENT DISCONNECTED");
// Attempt to reconnect in the future
get_connection(&handle_clone)
})
});
let client = client.map_err(|_| { panic!()});
Box::new(client)
}
记住 get_connection
是 non-blocking。它只是构造了一个Box<Future>
。这意味着当递归调用它时,我们仍然不会阻塞。相反,我们得到了一个新的未来,我们可以通过使用 and_then
link 到前一个。如您所见,这与普通递归不同,因为堆栈不会在每次迭代时增长。
请注意,我们需要克隆 handle
(参见 handle_clone
),并将其移动到传递给 reader.and_then
的闭包中。这是必要的,因为闭包的寿命比函数长(它将包含在我们返回的未来)。
处理错误
您提供的代码无法处理客户端无法连接到服务器的情况(也无法处理任何其他错误)。按照上面显示的相同原则,我们可以通过将 get_connection
的结尾更改为以下内容来处理错误:
let handle_clone = handle.clone();
let client = client.or_else(move |err| {
// Note: this code will infinitely retry, but you could pattern match on the error
// to retry only on certain kinds of error
println!("Error connecting to server: {}", err);
get_connection(&handle_clone)
});
Box::new(client)
注意 or_else
类似于 and_then
,但它对未来产生的错误进行操作。
正在从 main
中删除不必要的代码
最后,不用在main
函数中使用and_then
。您可以用以下代码替换您的main
:
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = get_connection(&handle);
core.run(client).unwrap();
}