如何将 tokio_postgres 与 Warp 一起使用?
How do I use tokio_postgres with Warp?
我用r2d2_postgres创建了一个连接池:
fn get_connection_pool(
) -> Result<r2d2::Pool<r2d2_postgres::PostgresConnectionManager<postgres::tls::NoTls>>, Error> {
let manager = PostgresConnectionManager::new(
"host=localhost user=someuser password=hunter2 dbname=mydb"
.parse()
.unwrap(),
NoTls,
);
let pool = r2d2::Pool::new(manager).unwrap();
Ok(pool)
}
然后将连接池克隆成warp web请求
if let Ok(pool) = pool_conns {
let hello = warp::path!("get_quote" / "co_num" / String / "ctrl_num" / String)
.map(move |co, ctrl| autorate::get_quote(co, ctrl, pool.clone()));
warp::serve(hello).run(([127, 0, 0, 1], 8889)).await;
}
然后在请求中调用 pool.get()
let mut client = pool.get().unwrap();
但收到运行时错误
thread 'main' panicked at 'Cannot start a runtime from within a runtime. This happens because a function
(like 'block_on') attempted to block the current thread while the thread is being used to drive asynchronous tasks.
我的问题是:在 Rust 中,这两个概念应该如何协同工作?具体来说,我指的是一个 postgres 连接池和一个异步 Web 服务器。我在想我应该有一个连接池并且能够将它传递到每个请求中以根据需要分配连接。我是在使用错误的连接池,还是只是以错误的方式传递它?
reddit 上的几位好心人引导我朝着正确的方向前进。我需要一个异步连接池而不是 r2d2,所以我切换到 deadpool_postgres。最终看起来像这样:
#[tokio::main]
async fn main() {
let mut cfg = Config::new();
cfg.host("yourhost");
cfg.user("youruser");
cfg.password("yourpass");
cfg.dbname("yourdb");
let mgr = Manager::new(cfg, tokio_postgres::NoTls);
let pool = Pool::new(mgr, 16);
let get_quote = warp::path!("get_quote" / "co_num" / String / "ctrl_num" / String)
.and(warp::any().map(move || pool.clone()))
.and_then(autorate::get_quote);
warp::serve(get_quote).run(([127, 0, 0, 1], 8889)).await;
}
然后使用连接:
pub async fn get_quote(
co: String,
ctrl: String,
pool: deadpool_postgres::Pool,
) -> Result<impl warp::Reply, std::convert::Infallible> {
let co_result = Decimal::from_str(&co);
let ctrl_result = Decimal::from_str(&ctrl);
let client = pool.get().await.unwrap();
if let (Ok(co_num), Ok(ctrl_num)) = (co_result, ctrl_result) {
let orders_result = get_orders(&client, &co_num, &ctrl_num).await;
if let Ok(orders) = orders_result {
if let Ok(rated_orders) = rate_orders(orders, &client).await {
return Ok(warp::reply::json(&rated_orders));
}
}
}
Ok(warp::reply::json(&"No results".to_string()))
}
async fn get_orders(
client: &deadpool_postgres::Client,
co: &Decimal,
ctrl: &Decimal,
) -> Result<Vec<Order>, Error> {
for row in client
.query().await
...
我用r2d2_postgres创建了一个连接池:
fn get_connection_pool(
) -> Result<r2d2::Pool<r2d2_postgres::PostgresConnectionManager<postgres::tls::NoTls>>, Error> {
let manager = PostgresConnectionManager::new(
"host=localhost user=someuser password=hunter2 dbname=mydb"
.parse()
.unwrap(),
NoTls,
);
let pool = r2d2::Pool::new(manager).unwrap();
Ok(pool)
}
然后将连接池克隆成warp web请求
if let Ok(pool) = pool_conns {
let hello = warp::path!("get_quote" / "co_num" / String / "ctrl_num" / String)
.map(move |co, ctrl| autorate::get_quote(co, ctrl, pool.clone()));
warp::serve(hello).run(([127, 0, 0, 1], 8889)).await;
}
然后在请求中调用 pool.get()
let mut client = pool.get().unwrap();
但收到运行时错误
thread 'main' panicked at 'Cannot start a runtime from within a runtime. This happens because a function
(like 'block_on') attempted to block the current thread while the thread is being used to drive asynchronous tasks.
我的问题是:在 Rust 中,这两个概念应该如何协同工作?具体来说,我指的是一个 postgres 连接池和一个异步 Web 服务器。我在想我应该有一个连接池并且能够将它传递到每个请求中以根据需要分配连接。我是在使用错误的连接池,还是只是以错误的方式传递它?
reddit 上的几位好心人引导我朝着正确的方向前进。我需要一个异步连接池而不是 r2d2,所以我切换到 deadpool_postgres。最终看起来像这样:
#[tokio::main]
async fn main() {
let mut cfg = Config::new();
cfg.host("yourhost");
cfg.user("youruser");
cfg.password("yourpass");
cfg.dbname("yourdb");
let mgr = Manager::new(cfg, tokio_postgres::NoTls);
let pool = Pool::new(mgr, 16);
let get_quote = warp::path!("get_quote" / "co_num" / String / "ctrl_num" / String)
.and(warp::any().map(move || pool.clone()))
.and_then(autorate::get_quote);
warp::serve(get_quote).run(([127, 0, 0, 1], 8889)).await;
}
然后使用连接:
pub async fn get_quote(
co: String,
ctrl: String,
pool: deadpool_postgres::Pool,
) -> Result<impl warp::Reply, std::convert::Infallible> {
let co_result = Decimal::from_str(&co);
let ctrl_result = Decimal::from_str(&ctrl);
let client = pool.get().await.unwrap();
if let (Ok(co_num), Ok(ctrl_num)) = (co_result, ctrl_result) {
let orders_result = get_orders(&client, &co_num, &ctrl_num).await;
if let Ok(orders) = orders_result {
if let Ok(rated_orders) = rate_orders(orders, &client).await {
return Ok(warp::reply::json(&rated_orders));
}
}
}
Ok(warp::reply::json(&"No results".to_string()))
}
async fn get_orders(
client: &deadpool_postgres::Client,
co: &Decimal,
ctrl: &Decimal,
) -> Result<Vec<Order>, Error> {
for row in client
.query().await
...