在无限循环中异步重新连接客户端到服务器

Asynchronously reconnecting a client to a server in an infinite loop

我无法创建尝试连接到服务器的客户端并且:

这是连接服务器的代码;目前,当连接丢失时,程序退出。我不确定实现它的最佳方法是什么;也许我必须创建一个带有无限循环的 Future

extern crate tokio_line;
use tokio_line::LineCodec;

fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {                                                                                                                                   
    let remote_addr = "127.0.0.1:9876".parse().unwrap();                                                                                                                                                            
    let tcp = TcpStream::connect(&remote_addr, handle);                                                                                                                                                             

    let client = tcp.and_then(|stream| {                                                                                                                                                                            
        let (sink, from_server) = stream.framed(LineCodec).split();                                                                                                                                                 
        let reader = from_server.for_each(|message| {                                                                                                                                                               
            println!("{}", message);                                                                                                                                                                                
            Ok(())                                                                                                                                                                                                  
        });                                                                                                                                                                                                         

        reader.map(|_| {                                                                                                                                                                                            
            println!("CLIENT DISCONNECTED");                                                                                                                                                                        
            ()                                                                                                                                                                                                      
        }).map_err(|err| err)                                                                                                                                                                                       
    });                                                                                                                                                                                                             

    let client = client.map_err(|_| { panic!()});                                                                                                                                                                   
    Box::new(client)                                                                                                                                                                                                
}                                                                                                                                                                                                                   

fn main() {                                                                                                                                                                                                         
    let mut core = Core::new().unwrap();                                                                                                                                                                            
    let handle = core.handle();                                                                                                                                                                                     
    let client = get_connection(&handle);                                                                                                                                                                           

    let client = client.and_then(|c| {                                                                                                                                                                              
        println!("Try to reconnect");                                                                                                                                                                               
        get_connection(&handle);                                                                                                                                                                                    
        Ok(())                                                                                                                                                                                                      
    });                                                                                                                                                                                                             

    core.run(client).unwrap();                                                                                                                                                                                      
}

添加 tokio-line crate:

tokio-line = { git = "https://github.com/tokio-rs/tokio-line" }

关键问题似乎是:如何使用 Tokio 实现无限循环?通过回答这个问题,我们可以解决断开连接后无限重连的问题。 根据我编写异步代码的经验,递归似乎是解决此问题的直接方法。

UPDATE:正如 Shepmaster(以及 Tokio Gitter 的成员)所指出的,我最初的答案会泄漏内存,因为我们构建了一条在每次迭代中增长的期货链。下面是一个新的:

更新后的答案:使用 loop_fn

futures crate 中有一个函数可以完全满足您的需要。它被称为loop_fn。您可以通过将主要功能更改为以下内容来使用它:

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let client = future::loop_fn((), |_| {
        // Run the get_connection function and loop again regardless of its result
        get_connection(&handle).map(|_| -> Loop<(), ()> {
            Loop::Continue(())
        })
    });

    core.run(client).unwrap();
}

该函数类似于一个 for 循环,它可以根据 get_connection 的结果继续或中断(请参阅 Loop 枚举的文档)。在这种情况下,我们选择一直继续,所以会无限重连。

请注意,如果出现错误(例如,如果客户端无法连接到服务器),您的 get_connection 版本将会崩溃。如果您还想在出错后重试,则应删除对 panic!.

的调用

旧答案:使用递归

以下是我的旧答案,以防有人觉得有趣。

警告:使用下面的代码会导致内存无限增长。

使get_connection无限循环

我们想在每次客户端断开连接时调用get_connection函数,所以这正是我们要做的(看reader.and_then后面的注释):

fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {
    let remote_addr = "127.0.0.1:9876".parse().unwrap();
    let tcp = TcpStream::connect(&remote_addr, handle);
    let handle_clone = handle.clone();

    let client = tcp.and_then(|stream| {
        let (sink, from_server) = stream.framed(LineCodec).split();
        let reader = from_server.for_each(|message| {
            println!("{}", message);
            Ok(())
        });

        reader.and_then(move |_| {
            println!("CLIENT DISCONNECTED");
            // Attempt to reconnect in the future
            get_connection(&handle_clone)
        })
    });

    let client = client.map_err(|_| { panic!()});
    Box::new(client)
}

记住 get_connection 是 non-blocking。它只是构造了一个Box<Future>。这意味着当递归调用它时,我们仍然不会阻塞。相反,我们得到了一个新的未来,我们可以通过使用 and_then link 到前一个。如您所见,这与普通递归不同,因为堆栈不会在每次迭代时增长。

请注意,我们需要克隆 handle(参见 handle_clone),并将其移动到传递给 reader.and_then 的闭包中。这是必要的,因为闭包的寿命比函数长(它将包含在我们返回的未来)。

处理错误

您提供的代码无法处理客户端无法连接到服务器的情况(也无法处理任何其他错误)。按照上面显示的相同原则,我们可以通过将 get_connection 的结尾更改为以下内容来处理错误:

let handle_clone = handle.clone();
let client = client.or_else(move |err| {
    // Note: this code will infinitely retry, but you could pattern match on the error
    // to retry only on certain kinds of error
    println!("Error connecting to server: {}", err);
    get_connection(&handle_clone)
});
Box::new(client)

注意 or_else 类似于 and_then,但它对未来产生的错误进行操作。

正在从 main

中删除不必要的代码

最后,不用在main函数中使用and_then。您可以用以下代码替换您的main

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let client = get_connection(&handle);
    core.run(client).unwrap();
}