Rust warp+sqlx 服务:将 DBPool 从 main 传递到处理程序的惯用方式
Rust warp+sqlx service : idiomatic way of passing DBPool from main to handlers
一个 Rust 新手,尝试通过结合
来编写一个 web 服务
https://github.com/seanmonstar/warp/blob/master/examples/todos.rs and https://github.com/launchbadge/sqlx/blob/master/examples/postgres/todos/src/main.rs
以下代码处于运行状态。我的问题是,我是否需要为每个处理程序克隆 dbpool? Rust 中的惯用方式是什么(我来自 Java->Kotlin->Go 背景,FWIW)
#![deny(warnings)]
use sqlx::postgres::{PgPoolOptions};
use std::env;
use warp::Filter;
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
let pool = PgPoolOptions::new()
.max_connections(5)
.connect("postgres://:@localhost/todo_db").await?;
if env::var_os("RUST_LOG").is_none() {
env::set_var("RUST_LOG", "todos=info");
}
pretty_env_logger::init();
let api = filters::todos(pool);
let routes = api.with(warp::log("todos"));
// Start up the server...
warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
Ok(())
}
mod filters {
use sqlx::{Pool, Postgres};
use super::handlers;
use super::models::{ListOptions, Todo};
use warp::Filter;
pub fn todos(
db: Pool<Postgres>,
) -> impl Filter<Extract=impl warp::Reply, Error=warp::Rejection> + Clone {
todos_list(db)
}
/// GET /todos?offset=3&limit=5
pub fn todos_list(
db: Pool<Postgres>,
) -> impl Filter<Extract=impl warp::Reply, Error=warp::Rejection> + Clone {
warp::path!("todos")
.and(warp::get())
.and(warp::query::<ListOptions>())
.and(with_db(db))
.and_then(handlers::list_todos)
}
fn with_db(db: Pool<Postgres>) -> impl Filter<Extract=(Pool<Postgres>, ), Error=std::convert::Infallible> + Clone {
warp::any().map(move || db.clone())
}
fn _json_body() -> impl Filter<Extract=(Todo, ), Error=warp::Rejection> + Clone {
warp::body::content_length_limit(1024 * 16).and(warp::body::json())
}
}
mod handlers {
use super::models::{ListOptions};
use std::convert::Infallible;
use sqlx::{Pool, Postgres};
use crate::models::Todo;
pub async fn list_todos(_opts: ListOptions, db: Pool<Postgres>) -> Result<impl warp::Reply, Infallible> {
let recs = sqlx::query!(
r#"
SELECT id, description, done
FROM todos
ORDER BY id
"#
)
.fetch_all(&db).await.expect("Some error message");
let x: Vec<Todo> = recs.iter().map(|rec| {
Todo { id: rec.id, text: rec.description.clone(), completed: rec.done }
}).collect();
Ok(warp::reply::json(&x))
}
}
mod models {
use serde_derive::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize)]
pub struct Todo {
pub id: i64,
pub text: String,
pub completed: bool,
}
// The query parameters for list_todos.
#[derive(Debug, Deserialize)]
pub struct ListOptions {
pub offset: Option<usize>,
pub limit: Option<usize>,
}
}
虽然 copy
ing 一个池只会增加 Arc
中的引用计数器并且相对便宜,正如@cdhowie 指出的那样,如果你喜欢这样做,你可以避免它:.fetch_all(db)
只需要一个不可变的引用。因此,您可以传入 &'static Pool<…>
。一个棘手的事情是:你不能直接声明 a
static POOL: Pool<Postgres> = …;
因为您没有什么可以为 …
添加内容。初始化statics时只能用const fn
,不能用.await
.
相反,您可以使用 OnceCell
。存在多种变体,tokio
中包含的变体可能在这里最方便:
static POOL: OnceCell<Pool<Postgres>> = OnceCell::const_new();
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
POOL.get_or_try_init(|| async {
PgPoolOptions::new()
.max_connections(5)
.connect("postgres://:@localhost/todo_db")
.await
})
.await?;
// Later, just access your pool with POOL.get().unwrap()
// You don't need the with_db filter anymore
但就个人而言,我更喜欢使用 Box::leak(Box::new(PgPoolOptions()….await?))
创建与应用程序本身一样长的连接或池。如果您认为这很糟糕,因为它(显然……)泄漏了内存,请考虑一下:OnceCell
也永远不会被删除或释放。这也意味着 OnceCell
和 Box::leak
都不允许完全关闭连接池,理论上您的代码与内部 Arc
s 可以做到这一点。
一个 Rust 新手,尝试通过结合
来编写一个 web 服务https://github.com/seanmonstar/warp/blob/master/examples/todos.rs and https://github.com/launchbadge/sqlx/blob/master/examples/postgres/todos/src/main.rs
以下代码处于运行状态。我的问题是,我是否需要为每个处理程序克隆 dbpool? Rust 中的惯用方式是什么(我来自 Java->Kotlin->Go 背景,FWIW)
#![deny(warnings)]
use sqlx::postgres::{PgPoolOptions};
use std::env;
use warp::Filter;
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
let pool = PgPoolOptions::new()
.max_connections(5)
.connect("postgres://:@localhost/todo_db").await?;
if env::var_os("RUST_LOG").is_none() {
env::set_var("RUST_LOG", "todos=info");
}
pretty_env_logger::init();
let api = filters::todos(pool);
let routes = api.with(warp::log("todos"));
// Start up the server...
warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
Ok(())
}
mod filters {
use sqlx::{Pool, Postgres};
use super::handlers;
use super::models::{ListOptions, Todo};
use warp::Filter;
pub fn todos(
db: Pool<Postgres>,
) -> impl Filter<Extract=impl warp::Reply, Error=warp::Rejection> + Clone {
todos_list(db)
}
/// GET /todos?offset=3&limit=5
pub fn todos_list(
db: Pool<Postgres>,
) -> impl Filter<Extract=impl warp::Reply, Error=warp::Rejection> + Clone {
warp::path!("todos")
.and(warp::get())
.and(warp::query::<ListOptions>())
.and(with_db(db))
.and_then(handlers::list_todos)
}
fn with_db(db: Pool<Postgres>) -> impl Filter<Extract=(Pool<Postgres>, ), Error=std::convert::Infallible> + Clone {
warp::any().map(move || db.clone())
}
fn _json_body() -> impl Filter<Extract=(Todo, ), Error=warp::Rejection> + Clone {
warp::body::content_length_limit(1024 * 16).and(warp::body::json())
}
}
mod handlers {
use super::models::{ListOptions};
use std::convert::Infallible;
use sqlx::{Pool, Postgres};
use crate::models::Todo;
pub async fn list_todos(_opts: ListOptions, db: Pool<Postgres>) -> Result<impl warp::Reply, Infallible> {
let recs = sqlx::query!(
r#"
SELECT id, description, done
FROM todos
ORDER BY id
"#
)
.fetch_all(&db).await.expect("Some error message");
let x: Vec<Todo> = recs.iter().map(|rec| {
Todo { id: rec.id, text: rec.description.clone(), completed: rec.done }
}).collect();
Ok(warp::reply::json(&x))
}
}
mod models {
use serde_derive::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize)]
pub struct Todo {
pub id: i64,
pub text: String,
pub completed: bool,
}
// The query parameters for list_todos.
#[derive(Debug, Deserialize)]
pub struct ListOptions {
pub offset: Option<usize>,
pub limit: Option<usize>,
}
}
虽然 copy
ing 一个池只会增加 Arc
中的引用计数器并且相对便宜,正如@cdhowie 指出的那样,如果你喜欢这样做,你可以避免它:.fetch_all(db)
只需要一个不可变的引用。因此,您可以传入 &'static Pool<…>
。一个棘手的事情是:你不能直接声明 a
static POOL: Pool<Postgres> = …;
因为您没有什么可以为 …
添加内容。初始化statics时只能用const fn
,不能用.await
.
相反,您可以使用 OnceCell
。存在多种变体,tokio
中包含的变体可能在这里最方便:
static POOL: OnceCell<Pool<Postgres>> = OnceCell::const_new();
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
POOL.get_or_try_init(|| async {
PgPoolOptions::new()
.max_connections(5)
.connect("postgres://:@localhost/todo_db")
.await
})
.await?;
// Later, just access your pool with POOL.get().unwrap()
// You don't need the with_db filter anymore
但就个人而言,我更喜欢使用 Box::leak(Box::new(PgPoolOptions()….await?))
创建与应用程序本身一样长的连接或池。如果您认为这很糟糕,因为它(显然……)泄漏了内存,请考虑一下:OnceCell
也永远不会被删除或释放。这也意味着 OnceCell
和 Box::leak
都不允许完全关闭连接池,理论上您的代码与内部 Arc
s 可以做到这一点。