hyper 与 bb8 和 postgres 的使用示例

Example usage of hyper with bb8 and postgres

我想使用 hyper with bb8 and tokio-postgres。在每个请求中,我都想从池中获取一个新连接。任何人都可以为这种情况提供一些例子吗? 目前我是这样做的:

fn main() {
    let addr = "127.0.0.1:3000".parse().unwrap();

    let pg_mgr =
        PostgresConnectionManager::new("postgresql://auth:auth@localhost:5433/auth", NoTls);

    rt::run(future::lazy(move || {
        Pool::builder()
            .build(pg_mgr)
            .map_err(|e| eprintln!("Database error: {}", e))
            .and_then(move |pool| {
                let service = || service_fn(|req| router(req, pool.clone()));

                let server = Server::bind(&addr)
                    .serve(service)
                    .map_err(|e| eprintln!("Server error: {}", e));

                println!("Listening on http://{}", addr);
                server
            })
    }))
}

fn router(
    _req: Request<Body>,
    _pool: Pool<PostgresConnectionManager<NoTls>>,
) -> Result<Response<Body>, hyper::Error> {
    // do some staff with pool
}

但它不会编译:

error[E0597]: `pool` does not live long enough
  --> src/main.rs:22:63
   |
22 |                 let service = || service_fn(|req| router(req, pool.clone()));
   |                               -- -----------------------------^^^^----------
   |                               |  |                            |
   |                               |  |                            borrowed value does not live long enough
   |                               |  returning this value requires that `pool` is borrowed for `'static`
   |                               value captured here
...
30 |             })
   |             - `pool` dropped here while still borrowed

我做错了什么?如何使我的案例正常工作?

解决方案非常简单,但为了理解问题,我想提供一些额外的信息...

  1. 当您在未来调用 and_then 以获取结果时,它会将变量的值传递给传递给 and_then 的闭包,从而使您拥有该数据的所有权.

  2. hypers 生成器上的方法 serve(由 Server::bind 返回)期望闭包具有静态生命周期。

现在解决问题:

  • 好:将闭包的值传递给服务,这会移动它,转移所有权。
  • 好:service_fn 是在 and_then 闭包之外定义的,因此函数寿命足够长
  • 不好:闭包使用局部变量池将其传递给service_fn

要解决此问题,只需 move 将本地数据放入您的闭包中,如下所示:

let service = move || service_fn(|req| router(req, pool));

找到解决方案here

最简单的解决方案如下:

fn main() {
    let addr = "127.0.0.1:3000".parse().unwrap();

    let pg_mgr =
        PostgresConnectionManager::new("postgresql://auth:auth@localhost:5433/auth", NoTls);

    rt::run(future::lazy(move || {
        Pool::builder()
            .build(pg_mgr)
            .map_err(|_| eprintln!("kek"))
            .and_then(move |pool| {
                let service = move || {
                    let pool = pool.clone();
                    service_fn(move |req| router(req, &pool))
                };

                let server = Server::bind(&addr)
                    .serve(service)
                    .map_err(|e| eprintln!("Server error: {}", e));

                println!("Listening on http://{}", addr);
                server
            })
    }))
}

fn router(
    _req: Request<Body>,
    _pool: &Pool<PostgresConnectionManager<NoTls>>,
) -> impl Future<Item = Response<Body>, Error = hyper::Error> {
    // some staff
}

也可以在 rt::run 之外构造 serviceArcMutex:

fn main() {
    let addr = "127.0.0.1:3000".parse().unwrap();

    let pg_mgr =
        PostgresConnectionManager::new("postgresql://auth:auth@localhost:5433/auth", NoTls);
    let pool: Arc<Mutex<Option<Pool<PostgresConnectionManager<NoTls>>>>> =
        Arc::new(Mutex::new(None));
    let pool2 = pool.clone();

    let service = move || {
        let pool = pool.clone();
        service_fn(move |req| {
            let locked = pool.lock().unwrap();
            let pool = locked
                .as_ref()
                .expect("bb8 should be initialized before hyper");
            router(req, pool)
        })
    };

    rt::run(future::lazy(move || {
        Pool::builder()
            .build(pg_mgr)
            .map_err(|_| eprintln!("kek"))
            .and_then(move |pool| {
                *pool2.lock().unwrap() = Some(pool);

                let server = Server::bind(&addr)
                    .serve(service)
                    .map_err(|e| eprintln!("Server error: {}", e));

                println!("Listening on http://{}", addr);
                server
            })
    }))
}

fn router(
    _req: Request<Body>,
    _pool: &Pool<PostgresConnectionManager<NoTls>>,
) -> impl Future<Item = Response<Body>, Error = hyper::Error> {
    // some staff
}